Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
// time-zone component and can be parsed with the timestamp formatter.
// Otherwise, it is likely to be a timestamp with timezone.
if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
SQLConf.get.timestampTypeInSchemaInference
SQLConf.get.timestampType
} else {
tryParseTimestamp(field)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
decimalTry.get
} else if (options.inferTimestamp &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
SQLConf.get.timestampTypeInSchemaInference
SQLConf.get.timestampType
} else if (options.inferTimestamp &&
timestampFormatter.parseOptional(field).isDefined) {
TimestampType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1425,16 +1425,6 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES =
buildConf("spark.sql.sources.timestampNTZTypeInference.enabled")
.doc("For the schema inference of JSON/CSV/JDBC data sources and partition directories, " +
"this config determines whether to choose the TimestampNTZ type if a column can be " +
"either TimestampNTZ or TimestampLTZ type. If set to true, the inference result of " +
"the column will be TimestampNTZ type. Otherwise, the result will be TimestampLTZ type.")
.version("3.4.0")
.booleanConf
.createWithDefault(false)

val BUCKETING_ENABLED = buildConf("spark.sql.sources.bucketing.enabled")
.doc("When false, we will treat bucketed table as normal table")
.version("2.0.0")
Expand Down Expand Up @@ -3538,8 +3528,9 @@ object SQLConf {

val TIMESTAMP_TYPE =
buildConf("spark.sql.timestampType")
.doc("Configures the default timestamp type of Spark SQL, including SQL DDL, Cast clause " +
s"and type literal. Setting the configuration as ${TimestampTypes.TIMESTAMP_NTZ} will " +
.doc("Configures the default timestamp type of Spark SQL, including SQL DDL, Cast clause, " +
"type literal and the schema inference of data sources. " +
s"Setting the configuration as ${TimestampTypes.TIMESTAMP_NTZ} will " +
"use TIMESTAMP WITHOUT TIME ZONE as the default type while putting it as " +
s"${TimestampTypes.TIMESTAMP_LTZ} will use TIMESTAMP WITH LOCAL TIME ZONE. " +
"Before the 3.4.0 release, Spark only supports the TIMESTAMP WITH " +
Expand Down Expand Up @@ -4848,18 +4839,6 @@ class SQLConf extends Serializable with Logging {
TimestampNTZType
}

def inferTimestampNTZInDataSources: Boolean = getConf(INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES)

// Preferred timestamp type in schema reference when a column can be either Timestamp type or
// TimestampNTZ type.
def timestampTypeInSchemaInference: AtomicType = {
if (getConf(INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES)) {
TimestampNTZType
} else {
TimestampType
}
}

def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)

def serializerNestedSchemaPruningEnabled: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
// inferField should infer a column as string type if it contains mixing dates and timestamps
assert(inferSchema.inferField(DateType, "2003|01|01") == StringType)
// SQL configuration must be set to default to TimestampNTZ
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
assert(inferSchema.inferField(DateType, "2003/02/05") == StringType)
}
withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
assert(inferSchema.inferField(DateType, "2003/02/05") == StringType)
}
assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == StringType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,8 @@ object PartitioningUtils extends SQLConfHelper {
val unescapedRaw = unescapePathName(raw)
// try and parse the date, if no exception occurs this is a candidate to be resolved as
// TimestampType or TimestampNTZType. The inference timestamp typ is controlled by the conf
// "spark.sql.sources.timestampNTZTypeInference.enabled".
val timestampType = conf.timestampTypeInSchemaInference
// "spark.sql.timestampType".
val timestampType = conf.timestampType
timestampType match {
case TimestampType => timestampFormatter.parse(unescapedRaw)
case TimestampNTZType => timestampFormatter.parseWithoutTimeZone(unescapedRaw)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.TimestampNTZType

/**
* Options for the JDBC data source.
Expand Down Expand Up @@ -237,7 +238,7 @@ class JDBCOptions(
parameters
.get(JDBC_INFER_TIMESTAMP_NTZ)
.map(_.toBoolean)
.getOrElse(SQLConf.get.inferTimestampNTZInDataSources)
.getOrElse(SQLConf.get.timestampType == TimestampNTZType)
}

class JdbcOptionsInWrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ abstract class CSVSuite
.option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.save(path.getAbsolutePath)

withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
val res = spark.read
.format("csv")
.option("inferSchema", "true")
Expand All @@ -1070,7 +1070,7 @@ abstract class CSVSuite
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.save(path.getAbsolutePath)

withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "false") {
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_LTZ") {
val res = spark.read
.format("csv")
.option("inferSchema", "true")
Expand Down Expand Up @@ -1117,15 +1117,15 @@ abstract class CSVSuite
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)

Seq(true, false).foreach { inferTimestampNTZ =>
withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
timestampTypes.foreach { timestampType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
val res = spark.read
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(path.getAbsolutePath)

if (inferTimestampNTZ) {
if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
checkAnswer(res, exp)
} else {
checkAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2782,7 +2782,7 @@ abstract class JsonSuite
.option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.json(path.getAbsolutePath)

withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
val res = spark.read
.option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.option("inferTimestamp", "true")
Expand All @@ -2804,7 +2804,7 @@ abstract class JsonSuite
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.json(path.getAbsolutePath)

withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "false") {
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) {
val res = spark.read
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.option("inferTimestamp", "true")
Expand Down Expand Up @@ -2847,11 +2847,11 @@ abstract class JsonSuite
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)

Seq(true, false).foreach { inferTimestampNTZ =>
withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
timestampTypes.foreach { timestampType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
val res = spark.read.option("inferTimestamp", "true").json(path.getAbsolutePath)

if (inferTimestampNTZ) {
if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
checkAnswer(res, exp)
} else {
checkAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,21 @@ abstract class ParquetPartitionDiscoverySuite
check("1.5", DoubleType)
check("hello", StringType)
check("1990-02-24", DateType)
// The inferred timestamp type is controlled by `SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
Seq(false, true).foreach { inferTimestampNTZ =>
withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
val timestampType = if (inferTimestampNTZ) {
TimestampNTZType
} else {
TimestampType
}
check("1990-02-24 12:00:30", timestampType)
check("1990-02-24 12:00:30", timestampType, ZoneOffset.UTC)
// The inferred timestamp type is controlled by `SQLConf.TIMESTAMP_TYPE`
val timestampTypes = Seq(
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)

timestampTypes.foreach { timestampType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
val expectedTimestampType =
if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
TimestampNTZType
} else {
TimestampType
}
check("1990-02-24 12:00:30", expectedTimestampType)
check("1990-02-24 12:00:30", expectedTimestampType, ZoneOffset.UTC)
}
}

Expand Down Expand Up @@ -372,9 +377,14 @@ abstract class ParquetPartitionDiscoverySuite
s"hdfs://host:9000/path2"),
PartitionSpec.emptySpec)

// The inferred timestamp type is controlled by `SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
Seq(false, true).foreach { inferTimestampNTZ =>
withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
// The inferred timestamp type is controlled by `SQLConf.TIMESTAMP_TYPE`
val timestampTypes = Seq(
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)

timestampTypes.foreach { timestampType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
val inferTimestampNTZ = timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString
// The cases below check the resolution for type conflicts.
val t1 = if (!inferTimestampNTZ) {
Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000
Expand All @@ -396,7 +406,7 @@ abstract class ParquetPartitionDiscoverySuite
s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"),
PartitionSpec(
StructType(Seq(
StructField("a", SQLConf.get.timestampTypeInSchemaInference),
StructField("a", SQLConf.get.timestampType),
StructField("b", DecimalType(22, 0)))),
Seq(
Partition(
Expand Down Expand Up @@ -661,9 +671,14 @@ abstract class ParquetPartitionDiscoverySuite
}

test("Various partition value types") {
// The inferred timestamp type is controlled by `SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
Seq(false, true).foreach { inferTimestampNTZ =>
withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
// The inferred timestamp type is controlled by `SQLConf.TIMESTAMP_TYPE`
val timestampTypes = Seq(
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)

timestampTypes.foreach { timestampType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
val inferTimestampNTZ = timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString
val ts = if (!inferTimestampNTZ) {
new Timestamp(0)
} else {
Expand Down Expand Up @@ -696,7 +711,7 @@ abstract class ParquetPartitionDiscoverySuite
DecimalType(10, 5),
DecimalType.SYSTEM_DEFAULT,
DateType,
SQLConf.get.timestampTypeInSchemaInference,
SQLConf.get.timestampType,
StringType)

val partitionColumns = partitionColumnTypes.zipWithIndex.map {
Expand Down Expand Up @@ -727,8 +742,13 @@ abstract class ParquetPartitionDiscoverySuite
}

test("Various inferred partition value types") {
Seq(false, true).foreach { inferTimestampNTZ =>
withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
val timestampTypes = Seq(
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)

timestampTypes.foreach { timestampType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
val inferTimestampNTZ = timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString
val ts = if (!inferTimestampNTZ) {
Timestamp.valueOf("1990-02-24 12:00:30")
} else {
Expand All @@ -750,7 +770,7 @@ abstract class ParquetPartitionDiscoverySuite
DoubleType,
DecimalType(20, 0),
DateType,
SQLConf.get.timestampTypeInSchemaInference,
SQLConf.get.timestampType,
StringType)

val partitionColumns = partitionColumnTypes.zipWithIndex.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1940,7 +1940,12 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)

Seq(true, false).foreach { inferTimestampNTZ =>
val timestampTypes = Seq(
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)

timestampTypes.foreach { timestampType =>
val inferTimestampNTZ = timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString
val tsType = if (inferTimestampNTZ) {
TimestampNTZType
} else {
Expand All @@ -1949,13 +1954,12 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
val res = readDf.option("inferTimestampNTZType", inferTimestampNTZ).load()
checkAnswer(res, Seq(Row(null)))
assert(res.schema.fields.head.dataType == tsType)
withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
val res2 = readDf.load()
checkAnswer(res2, Seq(Row(null)))
assert(res2.schema.fields.head.dataType == tsType)
}
}

}

test("SPARK-39339: TimestampNTZType with different local time zones") {
Expand Down Expand Up @@ -1986,7 +1990,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
.load()
checkAnswer(res, df)

withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
withSQLConf(
SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
val res2 = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
Expand Down