Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failVoidType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
val changes = cols.map { col =>
TableChange.addColumn(
Expand All @@ -47,6 +48,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case AlterTableReplaceColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failVoidType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(table) =>
Expand All @@ -69,6 +71,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case a @ AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
a.dataType.foreach(failVoidType)
a.dataType.foreach(failCharType)
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
Expand Down Expand Up @@ -145,6 +148,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
assertNoVoidTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
CreateV2Table(
catalog.asTableCatalog,
Expand Down Expand Up @@ -172,6 +176,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case c @ ReplaceTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
assertNoVoidTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
ReplaceTable(
catalog.asTableCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2184,7 +2184,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* Create a Spark DataType.
*/
private def visitSparkDataType(ctx: DataTypeContext): DataType = {
HiveStringType.replaceCharType(typedVisit(ctx))
HiveVoidType.replaceVoidType(HiveStringType.replaceCharType(typedVisit(ctx)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it why we need HiveVoidType. What happens if we just parse void to NullType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because that could indicate VOID is a Hive type, the handle processing is more unified. Or, we can just use the PR #28833

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, below function will point the failure is due to the legacy hive void type. If we mix VOID and NULL, I am not sure it would be better than separation.

  def failVoidType(dt: DataType): Unit = {
    if (HiveVoidType.containsVoidType(dt)) {
      throw new AnalysisException(
        "Cannot create tables with Hive VOID type.")
    }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VOID and NULL are indeed the same type. We can just check null type and fail with error message: Cannot create tables with VOID type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is consistency: The VOID type in SQL statement should be the same as NullType specified by Scala API in spark.catalog.createTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , ok. I will follow your suggestion to fix it in #28833 , since this PR is a refator with new type HiveVoidType. Now we don't need it.

}

/**
Expand Down Expand Up @@ -2212,6 +2212,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
case ("decimal" | "dec" | "numeric", precision :: scale :: Nil) =>
DecimalType(precision.getText.toInt, scale.getText.toInt)
case ("interval", Nil) => CalendarIntervalType
case ("void", Nil) => HiveVoidType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just reuse NullType. We should forbid creating table with null type completely, including spark.catalog.createTable.

case (dt, params) =>
val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt
throw new ParseException(s"DataType $dtStr is not supported.", ctx)
Expand Down Expand Up @@ -2258,9 +2259,9 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
builder.putString("comment", _)
}

// Add Hive type string to metadata.
// Add Hive type 'string' and 'void' to metadata.
val rawDataType = typedVisit[DataType](ctx.dataType)
val cleanedDataType = HiveStringType.replaceCharType(rawDataType)
val cleanedDataType = HiveVoidType.replaceVoidType(HiveStringType.replaceCharType(rawDataType))
if (rawDataType != cleanedDataType) {
builder.putString(HIVE_TYPE_STRING, rawDataType.catalogString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.AlterTable
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, HiveVoidType, MapType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -339,11 +339,26 @@ private[sql] object CatalogV2Util {
}
}

def failVoidType(dt: DataType): Unit = {
if (HiveVoidType.containsVoidType(dt)) {
throw new AnalysisException(
"Cannot create tables with Hive VOID type.")
}
}

def assertNoCharTypeInSchema(schema: StructType): Unit = {
schema.foreach { f =>
if (f.metadata.contains(HIVE_TYPE_STRING)) {
failCharType(CatalystSqlParser.parseRawDataType(f.metadata.getString(HIVE_TYPE_STRING)))
}
}
}

def assertNoVoidTypeInSchema(schema: StructType): Unit = {
schema.foreach { f =>
if (f.metadata.contains(HIVE_TYPE_STRING)) {
failVoidType(CatalystSqlParser.parseRawDataType(f.metadata.getString(HIVE_TYPE_STRING)))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types

/**
* A hive void type for compatibility. These datatypes should only used for parsing,
* and should NOT be used anywhere else. Any instance of these data types should be
* replaced by a [[NullType]] before analysis.
*/
class HiveVoidType private() extends DataType {

override def defaultSize: Int = 1

override private[spark] def asNullable: HiveVoidType = this

override def simpleString: String = "void"
}

case object HiveVoidType extends HiveVoidType {
def replaceVoidType(dt: DataType): DataType = dt match {
case ArrayType(et, nullable) =>
ArrayType(replaceVoidType(et), nullable)
case MapType(kt, vt, nullable) =>
MapType(replaceVoidType(kt), replaceVoidType(vt), nullable)
case StructType(fields) =>
StructType(fields.map(f => f.copy(dataType = replaceVoidType(f.dataType))))
case _: HiveVoidType => NullType
case _ => dt
}

def containsVoidType(dt: DataType): Boolean = dt match {
case ArrayType(et, _) => containsVoidType(et)
case MapType(kt, vt, _) => containsVoidType(kt) || containsVoidType(vt)
case StructType(fields) => fields.exists(f => containsVoidType(f.dataType))
case _ => dt.isInstanceOf[HiveVoidType]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class DataTypeParserSuite extends SparkFunSuite {
checkDataType("cHaR(27)", StringType)
checkDataType("BINARY", BinaryType)
checkDataType("interval", CalendarIntervalType)
checkDataType("void", NullType)

checkDataType("array<doublE>", ArrayType(DoubleType, true))
checkDataType("Array<map<int, tinYint>>", ArrayType(MapType(IntegerType, ByteType, true), true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, HiveVoidType, MetadataBuilder, StructField, StructType}

/**
* Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
Expand All @@ -50,6 +50,7 @@ class ResolveSessionCatalog(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
cols.foreach(c => failVoidType(c.dataType))
if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
cols.foreach(c => failCharType(c.dataType))
}
Expand All @@ -62,6 +63,7 @@ class ResolveSessionCatalog(
}
AlterTableAddColumnsCommand(tbl.asTableIdentifier, cols.map(convertToStructField))
}.getOrElse {
cols.foreach(c => failVoidType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
val changes = cols.map { col =>
TableChange.addColumn(
Expand All @@ -80,6 +82,7 @@ class ResolveSessionCatalog(
case Some(_: V1Table) =>
throw new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.")
case Some(table) =>
cols.foreach(c => failVoidType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
val deleteChanges = table.schema.fieldNames.map { name =>
Expand All @@ -102,6 +105,7 @@ class ResolveSessionCatalog(
nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
a.dataType.foreach(failVoidType)
if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
a.dataType.foreach(failCharType)
}
Expand Down Expand Up @@ -131,8 +135,9 @@ class ResolveSessionCatalog(
s"Available: ${v1Table.schema.fieldNames.mkString(", ")}")
}
}
// Add Hive type string to metadata.
val cleanedDataType = HiveStringType.replaceCharType(dataType)
// Add Hive type 'string' and 'void' to metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can be more aggressive here: forbid void type in all cases, including hive tables.

val cleanedDataType =
HiveVoidType.replaceVoidType(HiveStringType.replaceCharType(dataType))
if (dataType != cleanedDataType) {
builder.putString(HIVE_TYPE_STRING, dataType.catalogString)
}
Expand All @@ -143,6 +148,7 @@ class ResolveSessionCatalog(
builder.build())
AlterTableChangeColumnCommand(tbl.asTableIdentifier, colName, newColumn)
}.getOrElse {
a.dataType.foreach(failVoidType)
a.dataType.foreach(failCharType)
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
Expand Down Expand Up @@ -270,6 +276,7 @@ class ResolveSessionCatalog(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
assertNoVoidTypeInSchema(c.tableSchema)
if (!DDLUtils.isHiveTable(Some(provider))) {
assertNoCharTypeInSchema(c.tableSchema)
}
Expand All @@ -279,6 +286,7 @@ class ResolveSessionCatalog(
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, None)
} else {
assertNoVoidTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
CreateV2Table(
catalog.asTableCatalog,
Expand Down Expand Up @@ -323,6 +331,7 @@ class ResolveSessionCatalog(
if (!isV2Provider(provider)) {
throw new AnalysisException("REPLACE TABLE is only supported with v2 tables.")
} else {
assertNoVoidTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
ReplaceTable(
catalog.asTableCatalog,
Expand Down Expand Up @@ -719,7 +728,8 @@ class ResolveSessionCatalog(
val builder = new MetadataBuilder
col.comment.foreach(builder.putString("comment", _))

val cleanedDataType = HiveStringType.replaceCharType(col.dataType)
val cleanedDataType =
HiveVoidType.replaceVoidType(HiveStringType.replaceCharType(col.dataType))
if (col.dataType != cleanedDataType) {
builder.putString(HIVE_TYPE_STRING, col.dataType.catalogString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoVoidTypeInSchema
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.CreateTable
Expand Down Expand Up @@ -106,6 +107,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
} else {
withStorage
}
assertNoVoidTypeInSchema(withSchema.schema)

c.copy(tableDesc = withSchema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2309,6 +2309,85 @@ class HiveDDLSuite
}
}

test("SPARK-20680: Spark-sql do not support for void column datatype of view") {
withTable("t") {
withView("tblVoidType") {
val client =
spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
client.runSqlHive("CREATE TABLE t (t1 int)")
client.runSqlHive("INSERT INTO t VALUES (3)")
client.runSqlHive("CREATE VIEW tblVoidType AS SELECT NULL AS col FROM t")
checkAnswer(spark.table("tblVoidType"), Row(null))
// No exception shows
val desc = spark.sql("DESC tblVoidType").collect().toSeq
assert(desc.contains(Row("col", "null", null)))
}
}

// Forbid creating Hive table with void type in Spark
withTable("t1", "t2", "t3") {
val e1 = intercept[AnalysisException] {
spark.sql("CREATE TABLE t1 (v void) USING parquet")
}.getMessage
assert(e1.contains("Cannot create tables with Hive VOID type"))
val e2 = intercept[AnalysisException] {
spark.sql("CREATE TABLE t2 (v void) USING hive")
}.getMessage
assert(e2.contains("Cannot create tables with Hive VOID type"))
val e3 = intercept[AnalysisException] {
spark.sql("CREATE TABLE t3 (v void)")
}.getMessage
assert(e3.contains("Cannot create tables with Hive VOID type"))
}

// Make sure spark.catalog.createTable with void type will fail
val schema1 = new StructType().add("c", HiveVoidType)
assertHiveTableVoidType(schema1)
assertDSTableVoidType(schema1)

val schema2 = new StructType()
.add("c", StructType(Seq(StructField.apply("c1", HiveVoidType))))
assertHiveTableVoidType(schema2)
assertDSTableVoidType(schema2)

val schema3 = new StructType().add("c", ArrayType(HiveVoidType))
assertHiveTableVoidType(schema3)
assertDSTableVoidType(schema3)

val schema4 = new StructType()
.add("c", MapType(StringType, HiveVoidType))
assertHiveTableVoidType(schema4)
assertDSTableVoidType(schema4)

val schema5 = new StructType()
.add("c", MapType(HiveVoidType, StringType))
assertHiveTableVoidType(schema5)
assertDSTableVoidType(schema5)
}

private def assertHiveTableVoidType(schema: StructType): Unit = {
withTable("t") {
intercept[AnalysisException] {
spark.catalog.createTable(
tableName = "t",
source = "hive",
schema = schema,
options = Map("fileFormat" -> "parquet"))
}
}
}
private def assertDSTableVoidType(schema: StructType): Unit = {
withTable("t") {
intercept[AnalysisException] {
spark.catalog.createTable(
tableName = "t",
source = "parquet",
schema = schema,
options = Map.empty[String, String])
}
}
}

test("SPARK-21216: join with a streaming DataFrame") {
import org.apache.spark.sql.execution.streaming.MemoryStream
import testImplicits._
Expand Down