diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index a90720ac5108..cdbd2d49e8b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -25,11 +25,12 @@ import java.time.{Duration, Instant, LocalDate, LocalDateTime, LocalTime, Period import java.util.{Map => JavaMap} import javax.annotation.Nullable +import scala.language.existentials + import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DayTimeIntervalType._ @@ -60,6 +61,7 @@ object CatalystTypeConverters { } private def getConverterForType(dataType: DataType): CatalystTypeConverter[Any, Any, Any] = { + TypeUtils.failUnsupportedDataType(dataType, SQLConf.get) val converter = dataType match { case udt: UserDefinedType[_] => UDTConverter(udt) case arrayType: ArrayType => ArrayConverter(arrayType.elementType) @@ -68,18 +70,12 @@ object CatalystTypeConverters { case CharType(length) => new CharConverter(length) case VarcharType(length) => new VarcharConverter(length) case _: StringType => StringConverter - case _ @ (_: GeographyType | _: GeometryType) if !SQLConf.get.geospatialEnabled => - throw new org.apache.spark.sql.AnalysisException( - errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED", - messageParameters = scala.collection.immutable.Map.empty) case g: GeographyType => new GeographyConverter(g) case g: GeometryType => new GeometryConverter(g) case DateType if SQLConf.get.datetimeJava8ApiEnabled => LocalDateConverter case DateType => DateConverter - case _: TimeType if !SQLConf.get.isTimeTypeEnabled => - QueryCompilationErrors.unsupportedTimeTypeError() case _: TimeType => TimeConverter case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => InstantConverter case TimestampType => TimestampConverter diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala index 85a5cf4f6b26..080794643fa0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala @@ -345,6 +345,8 @@ object DeserializerBuildHelper { createDeserializerForInstant(path) case LocalDateTimeEncoder => createDeserializerForLocalDateTime(path) + case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled => + throw org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError() case LocalTimeEncoder => createDeserializerForLocalTime(path) case UDTEncoder(udt, udtClass) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala index 32fb859745d8..b8b2406a5813 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala @@ -367,6 +367,8 @@ object SerializerBuildHelper { case TimestampEncoder(false) => createSerializerForSqlTimestamp(input) case InstantEncoder(false) => createSerializerForJavaInstant(input) case LocalDateTimeEncoder => createSerializerForLocalDateTime(input) + case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled => + throw org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError() case LocalTimeEncoder => createSerializerForLocalTime(input) case UDTEncoder(udt, udtClass) => createSerializerForUserDefinedType(input, udt, udtClass) case OptionEncoder(valueEnc) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 04451162276f..3b8a363e704a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -712,6 +712,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString } create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) + TypeUtils.failUnsupportedDataType(create.tableSchema, SQLConf.get) SchemaUtils.checkIndeterminateCollationInSchema(create.tableSchema) case write: V2WriteCommand if write.resolved => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 3720b070db65..727b155e8579 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.catalyst.util.IntervalUtils.{dayTimeIntervalToByte, dayTimeIntervalToDecimal, dayTimeIntervalToInt, dayTimeIntervalToLong, dayTimeIntervalToShort, yearMonthIntervalToByte, yearMonthIntervalToInt, yearMonthIntervalToShort} -import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase, QueryExecutionErrors} +import org.apache.spark.sql.errors.{QueryErrorsBase, QueryExecutionErrors} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{GeographyVal, UTF8String, VariantVal} @@ -90,12 +90,6 @@ object Cast extends QueryErrorsBase { * - String <=> Binary */ def canAnsiCast(from: DataType, to: DataType): Boolean = (from, to) match { - case (fromType, toType) if !SQLConf.get.geospatialEnabled && - (isGeoSpatialType(fromType) || isGeoSpatialType(toType)) => - throw new org.apache.spark.sql.AnalysisException( - errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED", - messageParameters = scala.collection.immutable.Map.empty) - case (fromType, toType) if fromType == toType => true case (NullType, _) => true @@ -224,12 +218,6 @@ object Cast extends QueryErrorsBase { * Returns true iff we can cast `from` type to `to` type. */ def canCast(from: DataType, to: DataType): Boolean = (from, to) match { - case (fromType, toType) if !SQLConf.get.geospatialEnabled && - (isGeoSpatialType(fromType) || isGeoSpatialType(toType)) => - throw new org.apache.spark.sql.AnalysisException( - errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED", - messageParameters = scala.collection.immutable.Map.empty) - case (fromType, toType) if fromType == toType => true case (NullType, _) => true @@ -617,12 +605,7 @@ case class Cast( } override def checkInputDataTypes(): TypeCheckResult = { - dataType match { - // If the cast is to a TIME type, first check if TIME type is enabled. - case _: TimeType if !SQLConf.get.isTimeTypeEnabled => - throw QueryCompilationErrors.unsupportedTimeTypeError() - case _ => - } + TypeUtils.failUnsupportedDataType(dataType, SQLConf.get) val canCast = evalMode match { case EvalMode.LEGACY => Cast.canCast(child.dataType, dataType) case EvalMode.ANSI => Cast.canAnsiCast(child.dataType, dataType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala index 4ec8baf351cb..843ce22061d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala @@ -112,6 +112,7 @@ case class AddColumns( columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand { columnsToAdd.foreach { c => TypeUtils.failWithIntervalType(c.dataType) + TypeUtils.failUnsupportedDataType(c.dataType, conf) } override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved) @@ -144,6 +145,7 @@ case class ReplaceColumns( columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand { columnsToAdd.foreach { c => TypeUtils.failWithIntervalType(c.dataType) + TypeUtils.failUnsupportedDataType(c.dataType, conf) } override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala index 9f89f068b756..9c5df04f9569 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, TypeCheckResult, TypeCoercion} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering} +import org.apache.spark.sql.catalyst.expressions.st.STExpressionUtils.isGeoSpatialType import org.apache.spark.sql.catalyst.types.{PhysicalDataType, PhysicalNumericType} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -136,4 +138,15 @@ object TypeUtils extends QueryErrorsBase { } if (dataType.existsRecursively(isInterval)) f } + + def failUnsupportedDataType(dataType: DataType, conf: SQLConf): Unit = { + if (!conf.isTimeTypeEnabled && dataType.existsRecursively(_.isInstanceOf[TimeType])) { + throw QueryCompilationErrors.unsupportedTimeTypeError() + } + if (!conf.geospatialEnabled && dataType.existsRecursively(isGeoSpatialType)) { + throw new org.apache.spark.sql.AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED", + messageParameters = scala.collection.immutable.Map.empty) + } + } } diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala index a5799b543d72..74a55c698636 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala @@ -29,7 +29,7 @@ import org.apache.spark.connect.proto import org.apache.spark.connect.proto.ExecutePlanResponse import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.st.STExpressionUtils +import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.classic.{DataFrame, Dataset} import org.apache.spark.sql.connect.common.DataTypeProtoConverter import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto @@ -37,11 +37,10 @@ import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_ARROW_MAX_BATCH import org.apache.spark.sql.connect.planner.{InvalidInputErrors, SparkConnectPlanner} import org.apache.spark.sql.connect.service.ExecuteHolder import org.apache.spark.sql.connect.utils.{MetricGenerator, PipelineAnalysisContextUtils} -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{DoNotCleanup, LocalTableScanExec, QueryExecution, RemoveShuffleFiles, SkipMigration, SQLExecution} import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataType, StructType, TimeType} +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.ThreadUtils /** @@ -138,16 +137,7 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder) val sessionId = executePlan.sessionHolder.sessionId val spark = dataframe.sparkSession val schema = dataframe.schema - val geospatialEnabled = spark.sessionState.conf.geospatialEnabled - if (!geospatialEnabled && schema.existsRecursively(STExpressionUtils.isGeoSpatialType)) { - throw new org.apache.spark.sql.AnalysisException( - errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED", - messageParameters = scala.collection.immutable.Map.empty) - } - val timeTypeEnabled = spark.sessionState.conf.isTimeTypeEnabled - if (!timeTypeEnabled && schema.existsRecursively(_.isInstanceOf[TimeType])) { - throw QueryCompilationErrors.unsupportedTimeTypeError() - } + TypeUtils.failUnsupportedDataType(schema, spark.sessionState.conf) val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone val largeVarTypes = spark.sessionState.conf.arrowUseLargeVarTypes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index fb5b605bab01..3fd82573f001 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.{SparkException, SparkUpgradeException} import org.apache.spark.sql.{sources, SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, PredicateHelper} -import org.apache.spark.sql.catalyst.util.RebaseDateTime +import org.apache.spark.sql.catalyst.util.{RebaseDateTime, TypeUtils} import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions @@ -93,9 +93,7 @@ object DataSourceUtils extends PredicateHelper { * in a driver side. */ def verifySchema(format: FileFormat, schema: StructType, readOnly: Boolean = false): Unit = { - if (!SQLConf.get.isTimeTypeEnabled && schema.existsRecursively(_.isInstanceOf[TimeType])) { - throw QueryCompilationErrors.unsupportedTimeTypeError() - } + TypeUtils.failUnsupportedDataType(schema, SQLConf.get) schema.foreach { field => val supported = if (readOnly) { format.supportReadDataType(field.dataType)