Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import java.time.{Duration, Instant, LocalDate, LocalDateTime, LocalTime, Period
import java.util.{Map => JavaMap}
import javax.annotation.Nullable

import scala.language.existentials

import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType._
Expand Down Expand Up @@ -60,6 +61,7 @@ object CatalystTypeConverters {
}

private def getConverterForType(dataType: DataType): CatalystTypeConverter[Any, Any, Any] = {
TypeUtils.failUnsupportedDataType(dataType, SQLConf.get)
val converter = dataType match {
case udt: UserDefinedType[_] => UDTConverter(udt)
case arrayType: ArrayType => ArrayConverter(arrayType.elementType)
Expand All @@ -68,18 +70,12 @@ object CatalystTypeConverters {
case CharType(length) => new CharConverter(length)
case VarcharType(length) => new VarcharConverter(length)
case _: StringType => StringConverter
case _ @ (_: GeographyType | _: GeometryType) if !SQLConf.get.geospatialEnabled =>
throw new org.apache.spark.sql.AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
messageParameters = scala.collection.immutable.Map.empty)
case g: GeographyType =>
new GeographyConverter(g)
case g: GeometryType =>
new GeometryConverter(g)
case DateType if SQLConf.get.datetimeJava8ApiEnabled => LocalDateConverter
case DateType => DateConverter
case _: TimeType if !SQLConf.get.isTimeTypeEnabled =>
QueryCompilationErrors.unsupportedTimeTypeError()
case _: TimeType => TimeConverter
case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => InstantConverter
case TimestampType => TimestampConverter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ object DeserializerBuildHelper {
createDeserializerForInstant(path)
case LocalDateTimeEncoder =>
createDeserializerForLocalDateTime(path)
case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this missed before? Or a side-effect of unification (which becomes to miss in other layer)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's missed before and I found it when trying to unify the geo type blocking code.

throw org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we import org.apache.spark.sql.errors.QueryCompilationErrors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

geo type blocking does not import org.apache.spark.sql.AnalysisException either...

It's temporary and will be removed after we complete time type development. I think it's fine.

case LocalTimeEncoder =>
createDeserializerForLocalTime(path)
case UDTEncoder(udt, udtClass) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ object SerializerBuildHelper {
case TimestampEncoder(false) => createSerializerForSqlTimestamp(input)
case InstantEncoder(false) => createSerializerForJavaInstant(input)
case LocalDateTimeEncoder => createSerializerForLocalDateTime(input)
case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

throw org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. import org.apache.spark.sql.errors.QueryCompilationErrors?

case LocalTimeEncoder => createSerializerForLocalTime(input)
case UDTEncoder(udt, udtClass) => createSerializerForUserDefinedType(input, udt, udtClass)
case OptionEncoder(valueEnc) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
}

create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
TypeUtils.failUnsupportedDataType(create.tableSchema, SQLConf.get)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit off topic, but just follow how we block interval type for v2 catalogs, which we don't have any builtin impl in Spark yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use a new JIRA ID instead of a follow-up, @cloud-fan ?

SchemaUtils.checkIndeterminateCollationInSchema(create.tableSchema)

case write: V2WriteCommand if write.resolved =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.IntervalUtils.{dayTimeIntervalToByte, dayTimeIntervalToDecimal, dayTimeIntervalToInt, dayTimeIntervalToLong, dayTimeIntervalToShort, yearMonthIntervalToByte, yearMonthIntervalToInt, yearMonthIntervalToShort}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase, QueryExecutionErrors}
import org.apache.spark.sql.errors.{QueryErrorsBase, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{GeographyVal, UTF8String, VariantVal}
Expand Down Expand Up @@ -90,12 +90,6 @@ object Cast extends QueryErrorsBase {
* - String <=> Binary
*/
def canAnsiCast(from: DataType, to: DataType): Boolean = (from, to) match {
case (fromType, toType) if !SQLConf.get.geospatialEnabled &&
(isGeoSpatialType(fromType) || isGeoSpatialType(toType)) =>
throw new org.apache.spark.sql.AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
messageParameters = scala.collection.immutable.Map.empty)

case (fromType, toType) if fromType == toType => true

case (NullType, _) => true
Expand Down Expand Up @@ -224,12 +218,6 @@ object Cast extends QueryErrorsBase {
* Returns true iff we can cast `from` type to `to` type.
*/
def canCast(from: DataType, to: DataType): Boolean = (from, to) match {
case (fromType, toType) if !SQLConf.get.geospatialEnabled &&
(isGeoSpatialType(fromType) || isGeoSpatialType(toType)) =>
throw new org.apache.spark.sql.AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
messageParameters = scala.collection.immutable.Map.empty)

case (fromType, toType) if fromType == toType => true

case (NullType, _) => true
Expand Down Expand Up @@ -617,12 +605,7 @@ case class Cast(
}

override def checkInputDataTypes(): TypeCheckResult = {
dataType match {
// If the cast is to a TIME type, first check if TIME type is enabled.
case _: TimeType if !SQLConf.get.isTimeTypeEnabled =>
throw QueryCompilationErrors.unsupportedTimeTypeError()
case _ =>
}
TypeUtils.failUnsupportedDataType(dataType, SQLConf.get)
val canCast = evalMode match {
case EvalMode.LEGACY => Cast.canCast(child.dataType, dataType)
case EvalMode.ANSI => Cast.canAnsiCast(child.dataType, dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ case class AddColumns(
columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
columnsToAdd.foreach { c =>
TypeUtils.failWithIntervalType(c.dataType)
TypeUtils.failUnsupportedDataType(c.dataType, conf)
}

override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
Expand Down Expand Up @@ -144,6 +145,7 @@ case class ReplaceColumns(
columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
columnsToAdd.foreach { c =>
TypeUtils.failWithIntervalType(c.dataType)
TypeUtils.failUnsupportedDataType(c.dataType, conf)
}

override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering}
import org.apache.spark.sql.catalyst.expressions.st.STExpressionUtils.isGeoSpatialType
import org.apache.spark.sql.catalyst.types.{PhysicalDataType, PhysicalNumericType}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -136,4 +138,15 @@ object TypeUtils extends QueryErrorsBase {
}
if (dataType.existsRecursively(isInterval)) f
}

def failUnsupportedDataType(dataType: DataType, conf: SQLConf): Unit = {
if (!conf.isTimeTypeEnabled && dataType.existsRecursively(_.isInstanceOf[TimeType])) {
throw QueryCompilationErrors.unsupportedTimeTypeError()
}
if (!conf.geospatialEnabled && dataType.existsRecursively(isGeoSpatialType)) {
throw new org.apache.spark.sql.AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
messageParameters = scala.collection.immutable.Map.empty)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,18 @@ import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.st.STExpressionUtils
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.classic.{DataFrame, Dataset}
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_ARROW_MAX_BATCH_SIZE, CONNECT_SESSION_RESULT_CHUNKING_MAX_CHUNK_SIZE}
import org.apache.spark.sql.connect.planner.{InvalidInputErrors, SparkConnectPlanner}
import org.apache.spark.sql.connect.service.ExecuteHolder
import org.apache.spark.sql.connect.utils.{MetricGenerator, PipelineAnalysisContextUtils}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.{DoNotCleanup, LocalTableScanExec, QueryExecution, RemoveShuffleFiles, SkipMigration, SQLExecution}
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType, TimeType}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.ThreadUtils

/**
Expand Down Expand Up @@ -138,16 +137,7 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
val sessionId = executePlan.sessionHolder.sessionId
val spark = dataframe.sparkSession
val schema = dataframe.schema
val geospatialEnabled = spark.sessionState.conf.geospatialEnabled
if (!geospatialEnabled && schema.existsRecursively(STExpressionUtils.isGeoSpatialType)) {
throw new org.apache.spark.sql.AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
messageParameters = scala.collection.immutable.Map.empty)
}
val timeTypeEnabled = spark.sessionState.conf.isTimeTypeEnabled
if (!timeTypeEnabled && schema.existsRecursively(_.isInstanceOf[TimeType])) {
throw QueryCompilationErrors.unsupportedTimeTypeError()
}
TypeUtils.failUnsupportedDataType(schema, spark.sessionState.conf)
val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
val largeVarTypes = spark.sessionState.conf.arrowUseLargeVarTypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{SparkException, SparkUpgradeException}
import org.apache.spark.sql.{sources, SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.util.RebaseDateTime
import org.apache.spark.sql.catalyst.util.{RebaseDateTime, TypeUtils}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
Expand Down Expand Up @@ -93,9 +93,7 @@ object DataSourceUtils extends PredicateHelper {
* in a driver side.
*/
def verifySchema(format: FileFormat, schema: StructType, readOnly: Boolean = false): Unit = {
if (!SQLConf.get.isTimeTypeEnabled && schema.existsRecursively(_.isInstanceOf[TimeType])) {
throw QueryCompilationErrors.unsupportedTimeTypeError()
}
TypeUtils.failUnsupportedDataType(schema, SQLConf.get)
schema.foreach { field =>
val supported = if (readOnly) {
format.supportReadDataType(field.dataType)
Expand Down