Skip to content

Commit 517a1e6

Browse files
committed
add nested check
1 parent 0ba1242 commit 517a1e6

File tree

2 files changed

+50
-14
lines changed

2 files changed

+50
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, Cre
2929
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
3030
import org.apache.spark.sql.internal.SQLConf
3131
import org.apache.spark.sql.sources.InsertableRelation
32-
import org.apache.spark.sql.types.{AtomicType, DataTypes, StructType}
32+
import org.apache.spark.sql.types._
3333
import org.apache.spark.sql.util.SchemaUtils
3434

3535
/**
@@ -394,38 +394,55 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
394394
}
395395

396396
/**
397-
* SPARK-28443: Spark sql add exception when create field type NullType
397+
* SPARK-28443: fail the DDL command if it creates a table with null-type columns
398398
*/
399399
object DDLCheck extends (LogicalPlan => Unit) {
400400

401401
def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
402402

403-
def throwWhenExistsNullType(schema: StructType): Unit = {
404-
if (schema.exists(f => DataTypes.NullType.sameType(f.dataType))) {
405-
failAnalysis("DataType NullType is not supported for create table ")
403+
def throwWhenExistsNullType(dataType: DataType): Unit = {
404+
dataType match {
405+
case ArrayType(elementType, _) =>
406+
throwWhenExistsNullType(elementType)
407+
408+
case MapType(keyType, valueType, _) =>
409+
throwWhenExistsNullType(keyType)
410+
throwWhenExistsNullType(valueType)
411+
412+
case StructType(fields) =>
413+
fields.foreach{field => throwWhenExistsNullType(field.dataType)}
414+
415+
case other if other == NullType =>
416+
failAnalysis("DataType NullType is not supported for create table")
417+
418+
case _ => // OK
406419
}
407420
}
408421

422+
def checkSchema(schema: StructType): Unit = {
423+
schema.foreach{field => throwWhenExistsNullType(field.dataType)}
424+
}
425+
409426
override def apply(plan: LogicalPlan): Unit = {
410427
plan.foreach {
411428
case CreateTable(tableDesc, _, _) =>
412-
throwWhenExistsNullType(tableDesc.schema)
429+
checkSchema(tableDesc.schema)
413430

414431
case CreateV2Table(_, _, tableSchema, _, _, _) =>
415-
throwWhenExistsNullType(tableSchema)
432+
checkSchema(tableSchema)
416433

417434
// DataSourceAnalysis will convert CreateTable to CreateDataSourceTableCommand before check
418435
case CreateDataSourceTableCommand(table, _) =>
419-
throwWhenExistsNullType(table.schema)
436+
checkSchema(table.schema)
420437

421438
// HiveAnalysis will convert CreateTable to CreateTableCommand before check
422439
case CreateTableCommand(table, _) =>
423-
throwWhenExistsNullType(table.schema)
440+
checkSchema(table.schema)
424441

425442
case ReplaceTable(_, _, tableSchema, _, _, _) =>
426-
throwWhenExistsNullType(tableSchema)
443+
checkSchema(tableSchema)
427444

428-
case _ => // OK
445+
case _ => // skip
429446
}
430447
}
431448
}

sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._
2929
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
3030
import org.apache.spark.sql.catalyst.plans.logical.Range
3131
import org.apache.spark.sql.test.SharedSQLContext
32-
import org.apache.spark.sql.types.{NullType, StructType}
32+
import org.apache.spark.sql.types._
3333
import org.apache.spark.storage.StorageLevel
3434

3535

@@ -547,9 +547,28 @@ class CatalogSuite
547547
assert(spark.table("my_temp_table").storageLevel == StorageLevel.DISK_ONLY)
548548
}
549549

550-
test("SPARK-28443: Spark sql add exception when create field type NullType") {
550+
test("SPARK-28443: fail the DDL command if it creates a table with null-type columns") {
551+
val schema1 = new StructType().add("c", NullType)
552+
checkTableNullType(schema1)
553+
554+
val schema2 = new StructType()
555+
.add("c", StructType(Seq(StructField.apply("c1", NullType))))
556+
checkTableNullType(schema2)
557+
558+
val schema3 = new StructType().add("c", ArrayType(NullType))
559+
checkTableNullType(schema3)
560+
561+
val schema4 = new StructType()
562+
.add("c", MapType(StringType, NullType))
563+
checkTableNullType(schema4)
564+
565+
val schema5 = new StructType()
566+
.add("c", MapType(NullType, StringType))
567+
checkTableNullType(schema5)
568+
}
569+
570+
private def checkTableNullType(schema: StructType): Unit = {
551571
val expectedMsg = "DataType NullType is not supported for create table"
552-
val schema = new StructType().add("c", NullType)
553572
withTable("t") {
554573
val e = intercept[AnalysisException] {
555574
spark.catalog.createTable(

0 commit comments

Comments
 (0)