diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index d7603fbcff82..c1e3644e2657 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -33,7 +33,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case AlterTableAddColumnsStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => - cols.foreach(c => failNullType(c.dataType)) val changes = cols.map { col => TableChange.addColumn( col.name.toArray, @@ -46,7 +45,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case AlterTableReplaceColumnsStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => - cols.foreach(c => failNullType(c.dataType)) val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match { case Some(table) => // REPLACE COLUMNS deletes all the existing columns and adds new columns specified. @@ -68,7 +66,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ CreateTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => - assertNoNullTypeInSchema(c.tableSchema) CreateV2Table( catalog.asTableCatalog, tbl.asIdentifier, @@ -80,9 +77,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ CreateTableAsSelectStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => - if (c.asSelect.resolved) { - assertNoNullTypeInSchema(c.asSelect.schema) - } CreateTableAsSelect( catalog.asTableCatalog, tbl.asIdentifier, @@ -95,7 +89,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ ReplaceTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) => - assertNoNullTypeInSchema(c.tableSchema) ReplaceTable( catalog.asTableCatalog, tbl.asIdentifier, @@ -107,9 +100,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ ReplaceTableAsSelectStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => - if (c.asSelect.resolved) { - assertNoNullTypeInSchema(c.asSelect.schema) - } ReplaceTableAsSelect( catalog.asTableCatalog, tbl.asIdentifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 3d88d6232c74..14e445f69c39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -1153,8 +1153,6 @@ case class AlterTableAlterColumn( nullable: Option[Boolean], comment: Option[String], position: Option[FieldPosition]) extends AlterTableCommand { - import org.apache.spark.sql.connector.catalog.CatalogV2Util._ - dataType.foreach(failNullType) override def operation: String = "update" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index a779e50a1f21..a9e87724feb7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -25,9 +25,8 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation} import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo} import org.apache.spark.sql.connector.catalog.TableChange._ -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, NullType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -377,22 +376,4 @@ private[sql] object CatalogV2Util { .getOrElse(catalogManager.v2SessionCatalog) .asTableCatalog } - - def failNullType(dt: DataType): Unit = { - def containsNullType(dt: DataType): Boolean = dt match { - case ArrayType(et, _) => containsNullType(et) - case MapType(kt, vt, _) => containsNullType(kt) || containsNullType(vt) - case StructType(fields) => fields.exists(f => containsNullType(f.dataType)) - case _ => dt.isInstanceOf[NullType] - } - if (containsNullType(dt)) { - throw QueryCompilationErrors.cannotCreateTablesWithNullTypeError() - } - } - - def assertNoNullTypeInSchema(schema: StructType): Unit = { - schema.foreach { f => - failNullType(f.dataType) - } - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index f5760363343b..f31de476e986 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1393,10 +1393,6 @@ private[spark] object QueryCompilationErrors { new AnalysisException("multi-part identifier cannot be empty.") } - def cannotCreateTablesWithNullTypeError(): Throwable = { - new AnalysisException(s"Cannot create tables with ${NullType.simpleString} type.") - } - def functionUnsupportedInV2CatalogError(): Throwable = { new AnalysisException("function is only supported in v1 catalog") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 5b3e269fc7be..b9c82c39bb0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -48,7 +48,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case AlterTableAddColumnsStatement( nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => - cols.foreach(c => failNullType(c.dataType)) loadTable(catalog, tbl.asIdentifier).collect { case v1Table: V1Table => cols.foreach { c => @@ -72,7 +71,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case AlterTableReplaceColumnsStatement( nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => - cols.foreach(c => failNullType(c.dataType)) val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match { case Some(_: V1Table) => throw QueryCompilationErrors.replaceColumnsOnlySupportedWithV2TableError @@ -196,7 +194,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // session catalog and the table provider is not v2. case c @ CreateTableStatement( SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => - assertNoNullTypeInSchema(c.tableSchema) val (storageFormat, provider) = getStorageFormatAndProvider( c.provider, c.options, c.location, c.serde, ctas = false) if (!isV2Provider(provider)) { @@ -218,9 +215,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case c @ CreateTableAsSelectStatement( SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => - if (c.asSelect.resolved) { - assertNoNullTypeInSchema(c.asSelect.schema) - } val (storageFormat, provider) = getStorageFormatAndProvider( c.provider, c.options, c.location, c.serde, ctas = true) if (!isV2Provider(provider)) { @@ -251,7 +245,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // session catalog and the table provider is not v2. case c @ ReplaceTableStatement( SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) => - assertNoNullTypeInSchema(c.tableSchema) val provider = c.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { throw QueryCompilationErrors.replaceTableOnlySupportedWithV2TableError @@ -268,9 +261,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case c @ ReplaceTableAsSelectStatement( SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => - if (c.asSelect.resolved) { - assertNoNullTypeInSchema(c.asSelect.schema) - } val provider = c.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { throw QueryCompilationErrors.replaceTableAsSelectOnlySupportedWithV2TableError diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index c22bb3ff6f60..2a9e1adee273 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.DDLUtils @@ -277,8 +276,6 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi "in the table definition of " + table.identifier, conf.caseSensitiveAnalysis) - assertNoNullTypeInSchema(schema) - val normalizedPartCols = normalizePartitionColumns(schema, table) val normalizedBucketSpec = normalizeBucketSpec(schema, table) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 2bca463217f7..c8a5c03bdd2d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy} @@ -227,8 +226,6 @@ case class RelationConversions( conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) => // validation is required to be done here before relation conversion. DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema)) - // This is for CREATE TABLE .. STORED AS PARQUET/ORC AS SELECT null - assertNoNullTypeInSchema(query.schema) OptimizedCreateHiveTableAsSelectCommand( tableDesc, query, query.output.map(_.name), mode) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 3e01fcbe1644..bdc966b62ad1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.connector.FakeV2Provider import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} @@ -2394,114 +2393,47 @@ class HiveDDLSuite } } - test("SPARK-20680: do not support for null column datatype") { - withTable("t") { - withView("tabNullType") { - hiveClient.runSqlHive("CREATE TABLE t (t1 int)") - hiveClient.runSqlHive("INSERT INTO t VALUES (3)") - hiveClient.runSqlHive("CREATE VIEW tabNullType AS SELECT NULL AS col FROM t") - checkAnswer(spark.table("tabNullType"), Row(null)) - // No exception shows - val desc = spark.sql("DESC tabNullType").collect().toSeq - assert(desc.contains(Row("col", NullType.simpleString, null))) - } - } - - // Forbid CTAS with null type + test("SPARK-36241: support creating tables with null datatype") { + // CTAS with null type withTable("t1", "t2", "t3") { assertAnalysisError( - "CREATE TABLE t1 USING PARQUET AS SELECT null as null_col", - "Cannot create tables with null type") + "CREATE TABLE t1 USING PARQUET AS SELECT NULL AS null_col", + "Parquet data source does not support null data type") assertAnalysisError( - "CREATE TABLE t2 AS SELECT null as null_col", - "Cannot create tables with null type") + "CREATE TABLE t2 STORED AS PARQUET AS SELECT null as null_col", + "Unknown field type: void") - assertAnalysisError( - "CREATE TABLE t3 STORED AS PARQUET AS SELECT null as null_col", - "Cannot create tables with null type") + sql("CREATE TABLE t3 AS SELECT NULL AS null_col") + checkAnswer(sql("SELECT * FROM t3"), Row(null)) } - // Forbid Replace table AS SELECT with null type - withTable("t") { - val v2Source = classOf[FakeV2Provider].getName - assertAnalysisError( - s"CREATE OR REPLACE TABLE t USING $v2Source AS SELECT null as null_col", - "Cannot create tables with null type") - } - - // Forbid creating table with VOID type in Spark + // Create table with null type withTable("t1", "t2", "t3", "t4") { assertAnalysisError( "CREATE TABLE t1 (v VOID) USING PARQUET", - "Cannot create tables with null type") - assertAnalysisError( - "CREATE TABLE t2 (v VOID) USING hive", - "Cannot create tables with null type") - assertAnalysisError( - "CREATE TABLE t3 (v VOID)", - "Cannot create tables with null type") - assertAnalysisError( - "CREATE TABLE t4 (v VOID) STORED AS PARQUET", - "Cannot create tables with null type") - } + "Parquet data source does not support null data type") - // Forbid Replace table with VOID type - withTable("t") { - val v2Source = classOf[FakeV2Provider].getName assertAnalysisError( - s"CREATE OR REPLACE TABLE t (v VOID) USING $v2Source", - "Cannot create tables with null type") - } - - // Make sure spark.catalog.createTable with null type will fail - val schema1 = new StructType().add("c", NullType) - assertHiveTableNullType(schema1) - assertDSTableNullType(schema1) - - val schema2 = new StructType() - .add("c", StructType(Seq(StructField.apply("c1", NullType)))) - assertHiveTableNullType(schema2) - assertDSTableNullType(schema2) + "CREATE TABLE t2 (v VOID) STORED AS PARQUET", + "Unknown field type: void") - val schema3 = new StructType().add("c", ArrayType(NullType)) - assertHiveTableNullType(schema3) - assertDSTableNullType(schema3) - - val schema4 = new StructType() - .add("c", MapType(StringType, NullType)) - assertHiveTableNullType(schema4) - assertDSTableNullType(schema4) - - val schema5 = new StructType() - .add("c", MapType(NullType, StringType)) - assertHiveTableNullType(schema5) - assertDSTableNullType(schema5) - } + sql("CREATE TABLE t3 (v VOID) USING hive") + checkAnswer(sql("SELECT * FROM t3"), Seq.empty) - private def assertHiveTableNullType(schema: StructType): Unit = { - withTable("t") { - val e = intercept[AnalysisException] { - spark.catalog.createTable( - tableName = "t", - source = "hive", - schema = schema, - options = Map("fileFormat" -> "parquet")) - }.getMessage - assert(e.contains("Cannot create tables with null type")) + sql("CREATE TABLE t4 (v VOID)") + checkAnswer(sql("SELECT * FROM t4"), Seq.empty) } - } - private def assertDSTableNullType(schema: StructType): Unit = { + // Create table with null type using spark.catalog.createTable withTable("t") { - val e = intercept[AnalysisException] { - spark.catalog.createTable( - tableName = "t", - source = "json", - schema = schema, - options = Map.empty[String, String]) - }.getMessage - assert(e.contains("Cannot create tables with null type")) + val schema = new StructType().add("c", NullType) + spark.catalog.createTable( + tableName = "t", + source = "json", + schema = schema, + options = Map.empty[String, String]) + checkAnswer(sql("SELECT * FROM t"), Seq.empty) } }