From 7260e2f4f22cb5e9234ce547d9623bd686967155 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 25 Aug 2022 17:42:51 +1200 Subject: [PATCH 1/3] add configs --- .../sql/catalyst/csv/UnivocityParser.scala | 20 +++++++----- .../sql/catalyst/json/JacksonParser.scala | 20 +++++++----- .../apache/spark/sql/internal/SQLConf.scala | 22 +++++++++++++ .../execution/datasources/csv/CSVSuite.scala | 32 +++++++++++++++++++ .../datasources/json/JsonSuite.scala | 32 +++++++++++++++++++ 5 files changed, 110 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index c9955d72524c..9d855d1a93d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -124,15 +124,19 @@ class UnivocityParser( // dates and timestamps. // For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions. private val enableParsingFallbackForTimestampType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.timestampFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.csvEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } private val enableParsingFallbackForDateType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.dateFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.csvEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } // Retrieve the raw record string. private def getCurrentInput: UTF8String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 06133d44c13a..f8adac1ee44f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -82,15 +82,19 @@ class JacksonParser( // dates and timestamps. // For more information, see comments for "enableDateTimeParsingFallback" option in JSONOptions. private val enableParsingFallbackForTimestampType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.timestampFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } private val enableParsingFallbackForDateType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.dateFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } /** * Create a converter which converts the JSON documents held by the `JsonParser` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 31bdbca4a256..f8e128a05989 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3520,6 +3520,22 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CSV_ENABLE_DATE_TIME_PARSING_FALLBACK = + buildConf("spark.sql.csv.enableDateTimeParsingFallback") + .internal() + .doc("When set to true, enables legacy date/time parsing fallback in CSV") + .version("3.4.0") + .booleanConf + .createOptional + + val JSON_ENABLE_DATE_TIME_PARSING_FALLBACK = + buildConf("spark.sql.json.enableDateTimeParsingFallback") + .internal() + .doc("When set to true, enables legacy date/time parsing fallback in JSON") + .version("3.4.0") + .booleanConf + .createOptional + val ADD_PARTITION_BATCH_SIZE = buildConf("spark.sql.addPartitionInBatch.size") .internal() @@ -4621,6 +4637,12 @@ class SQLConf extends Serializable with Logging { def avroFilterPushDown: Boolean = getConf(AVRO_FILTER_PUSHDOWN_ENABLED) + def jsonEnableDateTimeParsingFallback: Option[Boolean] = + getConf(JSON_ENABLE_DATE_TIME_PARSING_FALLBACK) + + def csvEnableDateTimeParsingFallback: Option[Boolean] = + getConf(CSV_ENABLE_DATE_TIME_PARSING_FALLBACK) + def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 0068f57a7697..eca48b55c6c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2949,6 +2949,38 @@ abstract class CSVSuite ) } } + + test("SPARK-40215: enable parsing fallback for CSV in CORRECTED mode with a SQL config") { + withTempPath { path => + Seq("2020-01-01,2020-01-01").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + for (fallbackEnabled <- Seq(true, false)) { + withSQLConf( + SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED", + SQLConf.CSV_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") { + val df = spark.read + .schema("date date, ts timestamp") + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .csv(path.getAbsolutePath) + + if (fallbackEnabled) { + checkAnswer( + df, + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + } else { + checkAnswer( + df, + Seq(Row(null, null)) + ) + } + } + } + } + } } class CSVv1Suite extends CSVSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 02225d40c831..ecb580a3dbac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3322,6 +3322,38 @@ abstract class JsonSuite ) } } + + test("SPARK-40215: enable parsing fallback for JSON in CORRECTED mode with a SQL config") { + withTempPath { path => + Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + for (fallbackEnabled <- Seq(true, false)) { + withSQLConf( + SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED", + SQLConf.JSON_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") { + val df = spark.read + .schema("date date, ts timestamp") + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .json(path.getAbsolutePath) + + if (fallbackEnabled) { + checkAnswer( + df, + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + } else { + checkAnswer( + df, + Seq(Row(null, null)) + ) + } + } + } + } + } } class JsonV1Suite extends JsonSuite { From 4bffcc455f18b41e8a6813e69ce062ae5b344b26 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 25 Aug 2022 17:59:09 +1200 Subject: [PATCH 2/3] change config docs --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f8e128a05989..abaaf79c5299 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3523,7 +3523,7 @@ object SQLConf { val CSV_ENABLE_DATE_TIME_PARSING_FALLBACK = buildConf("spark.sql.csv.enableDateTimeParsingFallback") .internal() - .doc("When set to true, enables legacy date/time parsing fallback in CSV") + .doc("When true, enable legacy date/time parsing fallback in CSV") .version("3.4.0") .booleanConf .createOptional @@ -3531,7 +3531,7 @@ object SQLConf { val JSON_ENABLE_DATE_TIME_PARSING_FALLBACK = buildConf("spark.sql.json.enableDateTimeParsingFallback") .internal() - .doc("When set to true, enables legacy date/time parsing fallback in JSON") + .doc("When true, enable legacy date/time parsing fallback in JSON") .version("3.4.0") .booleanConf .createOptional From 48f320489b9ea925a4cc2e8ec280d7181591c799 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Fri, 26 Aug 2022 16:56:32 +1200 Subject: [PATCH 3/3] move to spark.sql.legacy namespace --- .../org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++------ .../sql/execution/datasources/csv/CSVSuite.scala | 2 +- .../sql/execution/datasources/json/JsonSuite.scala | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index abaaf79c5299..eb7a6a9105e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3520,16 +3520,16 @@ object SQLConf { .booleanConf .createWithDefault(true) - val CSV_ENABLE_DATE_TIME_PARSING_FALLBACK = - buildConf("spark.sql.csv.enableDateTimeParsingFallback") + val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK = + buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback") .internal() .doc("When true, enable legacy date/time parsing fallback in CSV") .version("3.4.0") .booleanConf .createOptional - val JSON_ENABLE_DATE_TIME_PARSING_FALLBACK = - buildConf("spark.sql.json.enableDateTimeParsingFallback") + val LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK = + buildConf("spark.sql.legacy.json.enableDateTimeParsingFallback") .internal() .doc("When true, enable legacy date/time parsing fallback in JSON") .version("3.4.0") @@ -4638,10 +4638,10 @@ class SQLConf extends Serializable with Logging { def avroFilterPushDown: Boolean = getConf(AVRO_FILTER_PUSHDOWN_ENABLED) def jsonEnableDateTimeParsingFallback: Option[Boolean] = - getConf(JSON_ENABLE_DATE_TIME_PARSING_FALLBACK) + getConf(LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK) def csvEnableDateTimeParsingFallback: Option[Boolean] = - getConf(CSV_ENABLE_DATE_TIME_PARSING_FALLBACK) + getConf(LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK) def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index eca48b55c6c4..5c97821f11ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2959,7 +2959,7 @@ abstract class CSVSuite for (fallbackEnabled <- Seq(true, false)) { withSQLConf( SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED", - SQLConf.CSV_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") { + SQLConf.LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") { val df = spark.read .schema("date date, ts timestamp") .option("dateFormat", "invalid") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index ecb580a3dbac..f0801ae313e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3332,7 +3332,7 @@ abstract class JsonSuite for (fallbackEnabled <- Seq(true, false)) { withSQLConf( SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED", - SQLConf.JSON_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") { + SQLConf.LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") { val df = spark.read .schema("date date, ts timestamp") .option("dateFormat", "invalid")