Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
val changes = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
Expand All @@ -46,7 +45,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case AlterTableReplaceColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(table) =>
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
Expand All @@ -68,7 +66,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
CreateV2Table(
catalog.asTableCatalog,
tbl.asIdentifier,
Expand All @@ -80,9 +77,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case c @ CreateTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
Expand All @@ -95,7 +89,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case c @ ReplaceTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
ReplaceTable(
catalog.asTableCatalog,
tbl.asIdentifier,
Expand All @@ -107,9 +100,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case c @ ReplaceTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
ReplaceTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,8 +1153,6 @@ case class AlterTableAlterColumn(
nullable: Option[Boolean],
comment: Option[String],
position: Option[FieldPosition]) extends AlterTableCommand {
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
dataType.foreach(failNullType)

override def operation: String = "update"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, NullType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -377,22 +376,4 @@ private[sql] object CatalogV2Util {
.getOrElse(catalogManager.v2SessionCatalog)
.asTableCatalog
}

def failNullType(dt: DataType): Unit = {
def containsNullType(dt: DataType): Boolean = dt match {
case ArrayType(et, _) => containsNullType(et)
case MapType(kt, vt, _) => containsNullType(kt) || containsNullType(vt)
case StructType(fields) => fields.exists(f => containsNullType(f.dataType))
case _ => dt.isInstanceOf[NullType]
}
if (containsNullType(dt)) {
throw QueryCompilationErrors.cannotCreateTablesWithNullTypeError()
}
}

def assertNoNullTypeInSchema(schema: StructType): Unit = {
schema.foreach { f =>
failNullType(f.dataType)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1393,10 +1393,6 @@ private[spark] object QueryCompilationErrors {
new AnalysisException("multi-part identifier cannot be empty.")
}

def cannotCreateTablesWithNullTypeError(): Throwable = {
new AnalysisException(s"Cannot create tables with ${NullType.simpleString} type.")
}

def functionUnsupportedInV2CatalogError(): Throwable = {
new AnalysisException("function is only supported in v1 catalog")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case AlterTableAddColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
cols.foreach { c =>
Expand All @@ -72,7 +71,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

case AlterTableReplaceColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(_: V1Table) =>
throw QueryCompilationErrors.replaceColumnsOnlySupportedWithV2TableError
Expand Down Expand Up @@ -196,7 +194,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// session catalog and the table provider is not v2.
case c @ CreateTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
val (storageFormat, provider) = getStorageFormatAndProvider(
c.provider, c.options, c.location, c.serde, ctas = false)
if (!isV2Provider(provider)) {
Expand All @@ -218,9 +215,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

case c @ CreateTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
val (storageFormat, provider) = getStorageFormatAndProvider(
c.provider, c.options, c.location, c.serde, ctas = true)
if (!isV2Provider(provider)) {
Expand Down Expand Up @@ -251,7 +245,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// session catalog and the table provider is not v2.
case c @ ReplaceTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.replaceTableOnlySupportedWithV2TableError
Expand All @@ -268,9 +261,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

case c @ ReplaceTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.replaceTableAsSelectOnlySupportedWithV2TableError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
Expand Down Expand Up @@ -277,8 +276,6 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)

assertNoNullTypeInSchema(schema)

val normalizedPartCols = normalizePartitionColumns(schema, table)
val normalizedBucketSpec = normalizeBucketSpec(schema, table)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy}
Expand Down Expand Up @@ -227,8 +226,6 @@ case class RelationConversions(
conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
// validation is required to be done here before relation conversion.
DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
// This is for CREATE TABLE .. STORED AS PARQUET/ORC AS SELECT null
assertNoNullTypeInSchema(query.schema)
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.FakeV2Provider
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
Expand Down Expand Up @@ -2394,114 +2393,47 @@ class HiveDDLSuite
}
}

test("SPARK-20680: do not support for null column datatype") {
withTable("t") {
withView("tabNullType") {
hiveClient.runSqlHive("CREATE TABLE t (t1 int)")
hiveClient.runSqlHive("INSERT INTO t VALUES (3)")
hiveClient.runSqlHive("CREATE VIEW tabNullType AS SELECT NULL AS col FROM t")
checkAnswer(spark.table("tabNullType"), Row(null))
// No exception shows
val desc = spark.sql("DESC tabNullType").collect().toSeq
assert(desc.contains(Row("col", NullType.simpleString, null)))
}
}

// Forbid CTAS with null type
test("SPARK-36241: support creating tables with null datatype") {
// CTAS with null type
withTable("t1", "t2", "t3") {
assertAnalysisError(
"CREATE TABLE t1 USING PARQUET AS SELECT null as null_col",
"Cannot create tables with null type")
"CREATE TABLE t1 USING PARQUET AS SELECT NULL AS null_col",
"Parquet data source does not support null data type")

assertAnalysisError(
"CREATE TABLE t2 AS SELECT null as null_col",
"Cannot create tables with null type")
"CREATE TABLE t2 STORED AS PARQUET AS SELECT null as null_col",
"Unknown field type: void")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet doesn't support null(spark)/void(hive) type


assertAnalysisError(
"CREATE TABLE t3 STORED AS PARQUET AS SELECT null as null_col",
"Cannot create tables with null type")
sql("CREATE TABLE t3 AS SELECT NULL AS null_col")
checkAnswer(sql("SELECT * FROM t3"), Row(null))
}

// Forbid Replace table AS SELECT with null type
withTable("t") {
val v2Source = classOf[FakeV2Provider].getName
assertAnalysisError(
s"CREATE OR REPLACE TABLE t USING $v2Source AS SELECT null as null_col",
"Cannot create tables with null type")
}

// Forbid creating table with VOID type in Spark
// Create table with null type
withTable("t1", "t2", "t3", "t4") {
assertAnalysisError(
"CREATE TABLE t1 (v VOID) USING PARQUET",
"Cannot create tables with null type")
assertAnalysisError(
"CREATE TABLE t2 (v VOID) USING hive",
"Cannot create tables with null type")
assertAnalysisError(
"CREATE TABLE t3 (v VOID)",
"Cannot create tables with null type")
assertAnalysisError(
"CREATE TABLE t4 (v VOID) STORED AS PARQUET",
"Cannot create tables with null type")
}
"Parquet data source does not support null data type")

// Forbid Replace table with VOID type
withTable("t") {
val v2Source = classOf[FakeV2Provider].getName
assertAnalysisError(
s"CREATE OR REPLACE TABLE t (v VOID) USING $v2Source",
"Cannot create tables with null type")
}

// Make sure spark.catalog.createTable with null type will fail
val schema1 = new StructType().add("c", NullType)
assertHiveTableNullType(schema1)
assertDSTableNullType(schema1)

val schema2 = new StructType()
.add("c", StructType(Seq(StructField.apply("c1", NullType))))
assertHiveTableNullType(schema2)
assertDSTableNullType(schema2)
"CREATE TABLE t2 (v VOID) STORED AS PARQUET",
"Unknown field type: void")

val schema3 = new StructType().add("c", ArrayType(NullType))
assertHiveTableNullType(schema3)
assertDSTableNullType(schema3)

val schema4 = new StructType()
.add("c", MapType(StringType, NullType))
assertHiveTableNullType(schema4)
assertDSTableNullType(schema4)

val schema5 = new StructType()
.add("c", MapType(NullType, StringType))
assertHiveTableNullType(schema5)
assertDSTableNullType(schema5)
}
sql("CREATE TABLE t3 (v VOID) USING hive")
checkAnswer(sql("SELECT * FROM t3"), Seq.empty)

private def assertHiveTableNullType(schema: StructType): Unit = {
withTable("t") {
val e = intercept[AnalysisException] {
spark.catalog.createTable(
tableName = "t",
source = "hive",
schema = schema,
options = Map("fileFormat" -> "parquet"))
}.getMessage
assert(e.contains("Cannot create tables with null type"))
sql("CREATE TABLE t4 (v VOID)")
checkAnswer(sql("SELECT * FROM t4"), Seq.empty)
}
}

private def assertDSTableNullType(schema: StructType): Unit = {
// Create table with null type using spark.catalog.createTable
withTable("t") {
val e = intercept[AnalysisException] {
spark.catalog.createTable(
tableName = "t",
source = "json",
schema = schema,
options = Map.empty[String, String])
}.getMessage
assert(e.contains("Cannot create tables with null type"))
val schema = new StructType().add("c", NullType)
spark.catalog.createTable(
tableName = "t",
source = "json",
schema = schema,
options = Map.empty[String, String])
checkAnswer(sql("SELECT * FROM t"), Seq.empty)
}
}

Expand Down