From da1850335d32720433a8896a5f4fddbf6bf0d9aa Mon Sep 17 00:00:00 2001 From: Marios Meimaris Date: Thu, 20 May 2021 11:27:29 +0200 Subject: [PATCH 1/6] Override getJDBCType in MySQLDialect to map FloatType to FLOAT --- .../apache/spark/sql/jdbc/MySQLDialect.scala | 22 ++++++++++++++++--- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 15 +++++-------- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 71bba6f1105ba..6879750d7d2f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -17,9 +17,6 @@ package org.apache.spark.sql.jdbc -import java.sql.{SQLFeatureNotSupportedException, Types} -import java.util.Locale - import org.apache.spark.sql.types.{BooleanType, DataType, LongType, MetadataBuilder} private case object MySQLDialect extends JdbcDialect { @@ -94,4 +91,23 @@ private case object MySQLDialect extends JdbcDialect { override def getTableCommentQuery(table: String, comment: String): String = { s"ALTER TABLE $table COMMENT = '$comment'" } + + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { + case IntegerType => Option(JdbcType("INTEGER", java.sql.Types.INTEGER)) + case LongType => Option(JdbcType("BIGINT", java.sql.Types.BIGINT)) + case DoubleType => Option(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE)) + // See SPARK-35446: MySQL treats REAL as a synonym to DOUBLE by default + // We override getJDBCType so that FloatType is mapped to FLOAT instead + case FloatType => Option(JdbcType("FLOAT", java.sql.Types.FLOAT)) + case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT)) + case ByteType => Option(JdbcType("BYTE", java.sql.Types.TINYINT)) + case BooleanType => Option(JdbcType("BIT(1)", java.sql.Types.BIT)) + case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB)) + case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB)) + case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP)) + case DateType => Option(JdbcType("DATE", java.sql.Types.DATE)) + case t: DecimalType => Option( + JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL)) + case _ => None + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 5865ff245aa39..f620ac3a0cfbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -17,21 +17,13 @@ package org.apache.spark.sql.jdbc -import java.math.BigDecimal -import java.sql.{Date, DriverManager, SQLException, Timestamp} -import java.time.{Instant, LocalDate} -import java.util.{Calendar, GregorianCalendar, Properties, TimeZone} - -import scala.collection.JavaConverters._ - import org.h2.jdbc.JdbcSQLException import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} - import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} -import org.apache.spark.sql.catalyst.{analysis, TableIdentifier} +import org.apache.spark.sql.catalyst.{TableIdentifier, analysis} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils} @@ -899,6 +891,11 @@ class JDBCSuite extends QueryTest Option(TimestampType)) } + test("MySQLDialect type mapping") { + val mySqlDialect = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") + assert(mySqlDialect.getJDBCType(FloatType).map(_.databaseTypeDefinition).get == "FLOAT") + } + test("PostgresDialect type mapping") { val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") val md = new MetadataBuilder().putLong("scale", 0) From 615338aa8c527dae28c975692003b4b3859d7724 Mon Sep 17 00:00:00 2001 From: Marios Meimaris Date: Thu, 20 May 2021 11:56:08 +0200 Subject: [PATCH 2/6] re-trigger tests From 795948bdac71ea97e231add793bcdef2968dacf8 Mon Sep 17 00:00:00 2001 From: Marios Meimaris Date: Thu, 20 May 2021 12:01:00 +0200 Subject: [PATCH 3/6] revert import changes --- .../scala/org/apache/spark/sql/jdbc/MySQLDialect.scala | 3 +++ .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 10 +++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 6879750d7d2f0..afed20d79460d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.jdbc +import java.sql.{SQLFeatureNotSupportedException, Types} +import java.util.Locale + import org.apache.spark.sql.types.{BooleanType, DataType, LongType, MetadataBuilder} private case object MySQLDialect extends JdbcDialect { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index f620ac3a0cfbf..271fe080b1c94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -17,13 +17,21 @@ package org.apache.spark.sql.jdbc +import java.math.BigDecimal +import java.sql.{Date, DriverManager, SQLException, Timestamp} +import java.time.{Instant, LocalDate} +import java.util.{Calendar, GregorianCalendar, Properties, TimeZone} + +import scala.collection.JavaConverters._ + import org.h2.jdbc.JdbcSQLException import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} + import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} -import org.apache.spark.sql.catalyst.{TableIdentifier, analysis} +import org.apache.spark.sql.catalyst.{analysis, TableIdentifier} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils} From 30e6717bd718aa07d4f86016b0f8f47c3824ee05 Mon Sep 17 00:00:00 2001 From: Marios Meimaris Date: Thu, 20 May 2021 12:09:15 +0200 Subject: [PATCH 4/6] added type imports --- .../src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index afed20d79460d..7f679e366444f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.jdbc import java.sql.{SQLFeatureNotSupportedException, Types} import java.util.Locale -import org.apache.spark.sql.types.{BooleanType, DataType, LongType, MetadataBuilder} +import org.apache.spark.sql.types._ private case object MySQLDialect extends JdbcDialect { From 60afab1b4a93047e859ef8a1aeb8f6c19378e0ba Mon Sep 17 00:00:00 2001 From: Marios Meimaris Date: Tue, 25 May 2021 16:47:13 +0200 Subject: [PATCH 5/6] Addressed comments --- .../apache/spark/sql/jdbc/MySQLDialect.scala | 17 +++-------------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 7f679e366444f..81b816449f59e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.jdbc import java.sql.{SQLFeatureNotSupportedException, Types} import java.util.Locale -import org.apache.spark.sql.types._ +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.types.{BooleanType, DataType, FloatType, LongType, MetadataBuilder} private case object MySQLDialect extends JdbcDialect { @@ -96,21 +97,9 @@ private case object MySQLDialect extends JdbcDialect { } override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { - case IntegerType => Option(JdbcType("INTEGER", java.sql.Types.INTEGER)) - case LongType => Option(JdbcType("BIGINT", java.sql.Types.BIGINT)) - case DoubleType => Option(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE)) // See SPARK-35446: MySQL treats REAL as a synonym to DOUBLE by default // We override getJDBCType so that FloatType is mapped to FLOAT instead case FloatType => Option(JdbcType("FLOAT", java.sql.Types.FLOAT)) - case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT)) - case ByteType => Option(JdbcType("BYTE", java.sql.Types.TINYINT)) - case BooleanType => Option(JdbcType("BIT(1)", java.sql.Types.BIT)) - case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB)) - case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB)) - case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP)) - case DateType => Option(JdbcType("DATE", java.sql.Types.DATE)) - case t: DecimalType => Option( - JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL)) - case _ => None + case _ => JdbcUtils.getCommonJDBCType(dt) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 271fe080b1c94..f85c479d60df5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -899,7 +899,7 @@ class JDBCSuite extends QueryTest Option(TimestampType)) } - test("MySQLDialect type mapping") { + test("SPARK-35446: MySQLDialect type mapping of float") { val mySqlDialect = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") assert(mySqlDialect.getJDBCType(FloatType).map(_.databaseTypeDefinition).get == "FLOAT") } From 77cf81c01c750a870e68fe17572c42b617410eb7 Mon Sep 17 00:00:00 2001 From: Marios Meimaris Date: Fri, 4 Jun 2021 10:08:31 +0200 Subject: [PATCH 6/6] added migration note --- docs/sql-migration-guide.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index a5b2d8be7aaa5..e6c312abf1698 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -92,6 +92,8 @@ license: | - In Spark 3.2, `CREATE TABLE AS SELECT` with non-empty `LOCATION` will throw `AnalysisException`. To restore the behavior before Spark 3.2, you can set `spark.sql.legacy.allowNonEmptyLocationInCTAS` to `true`. - In Spark 3.2, special datetime values such as `epoch`, `today`, `yesterday`, `tomorrow`, and `now` are supported in typed literals only, for instance, `select timestamp'now'`. In Spark 3.1 and 3.0, such special values are supported in any casts of strings to dates/timestamps. To keep these special values as dates/timestamps in Spark 3.1 and 3.0, you should replace them manually, e.g. `if (c in ('now', 'today'), current_date(), cast(c as date))`. + + - In Spark 3.2, `FloatType` is mapped to `FLOAT` in MySQL. Prior to this, it used to be mapped to `REAL`, which is by default a synonym to `DOUBLE PRECISION` in MySQL. ## Upgrading from Spark SQL 3.0 to 3.1