From 2e05a85579570705026a7628135f93d303f34537 Mon Sep 17 00:00:00 2001 From: "YIHAODIAN\\wangshuangshuang" Date: Tue, 27 Jun 2017 17:58:57 +0800 Subject: [PATCH 01/11] Spark-19726: Insert into null timestamp value into mysql use null instead of spark_default_timestamp_value --- .../execution/datasources/jdbc/JDBCRDD.scala | 8 +- .../spark/sql/sources/JdbcInsertSuite.scala | 82 +++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 2bdc43254133e..4241a98543a43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -61,7 +61,13 @@ object JDBCRDD extends Logging { try { val rs = statement.executeQuery() try { - JdbcUtils.getSchema(rs, dialect) + val metaStructType = JdbcUtils.getSchema(rs, dialect) + StructType(metaStructType.map(f => + if(f.nullable) + f + else + StructField(f.name, f.dataType, true, f.metadata) + )) } finally { rs.close() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala new file mode 100644 index 0000000000000..d5f10d24e61ca --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.io.File +import java.sql.DriverManager +import java.util.Properties + +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.util.Utils +import org.scalatest.BeforeAndAfter + +class JdbcInsertSuite extends DataSourceTest with BeforeAndAfter with SharedSQLContext { + import testImplicits._ + + val url = "jdbc:h2:mem:testdb0" + val urlWithUserAndPass = "jdbc:h2:mem:testdb0;user=testUser;password=testPass" + var conn: java.sql.Connection = null + + + protected override lazy val sql = spark.sql _ + + before { + Utils.classForName("org.h2.Driver") + val properties = new Properties() + properties.setProperty("user", "testUser") + properties.setProperty("password", "testPass") + properties.setProperty("rowId", "false") + + conn = DriverManager.getConnection(url, properties) + conn.prepareStatement("create schema test").executeUpdate() + conn.prepareStatement( + "create table test.timetest (name TEXT(32) NOT NULL, time_stamp TIMESTAMP NOT NULL)"). + executeUpdate() + + conn.commit() + + sql( + s""" + |CREATE OR REPLACE TEMPORARY VIEW jdbcTable + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url', dbtable 'test.timetest', user 'testUser', password 'testPass') + """.stripMargin.replaceAll("\n", " ")) + } + after { + conn.prepareStatement("drop table test.timetest").executeUpdate() + conn.prepareStatement("drop schema test").executeUpdate() + + conn.commit() + conn.close() + } + test("insert into null timestamp") { + var testPassed = false + try{ + sql( + s""" + |INSERT INTO jdbcTable values(123, null) + """.stripMargin) + } catch { + case e: Exception => + testPassed = true + } + assert(testPassed, "Throw Null Exception Correctlly!") + } +} + From 588ccad369f3f8eb8c38c2f01748e5262ee5a676 Mon Sep 17 00:00:00 2001 From: "YIHAODIAN\\wangshuangshuang" Date: Wed, 28 Jun 2017 10:30:03 +0800 Subject: [PATCH 02/11] Spark-19726: modify unit tests to expect catch exception --- .../spark/sql/sources/JdbcInsertSuite.scala | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala index d5f10d24e61ca..42cb82929f073 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala @@ -46,7 +46,7 @@ class JdbcInsertSuite extends DataSourceTest with BeforeAndAfter with SharedSQLC conn = DriverManager.getConnection(url, properties) conn.prepareStatement("create schema test").executeUpdate() conn.prepareStatement( - "create table test.timetest (name TEXT(32) NOT NULL, time_stamp TIMESTAMP NOT NULL)"). + "create table test.timestamp_test (id bigint(11) DEFAULT NULL, time_stamp TIMESTAMP NOT NULL)"). executeUpdate() conn.commit() @@ -55,28 +55,33 @@ class JdbcInsertSuite extends DataSourceTest with BeforeAndAfter with SharedSQLC s""" |CREATE OR REPLACE TEMPORARY VIEW jdbcTable |USING org.apache.spark.sql.jdbc - |OPTIONS (url '$url', dbtable 'test.timetest', user 'testUser', password 'testPass') + |OPTIONS (url '$url', dbtable 'test.timestamp_test', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) } + after { - conn.prepareStatement("drop table test.timetest").executeUpdate() + spark.catalog.dropTempView("jdbcTable") + + conn.prepareStatement("drop table test.timestamp_test").executeUpdate() conn.prepareStatement("drop schema test").executeUpdate() conn.commit() conn.close() } - test("insert into null timestamp") { - var testPassed = false - try{ + + test("SPARK-19726 - Faild to insert null timestamp value to mysql using spark jdbc") { + + val message = intercept[Exception] { sql( s""" - |INSERT INTO jdbcTable values(123, null) + |INSERT INTO jdbcTable values(222, null) """.stripMargin) - } catch { - case e: Exception => - testPassed = true - } - assert(testPassed, "Throw Null Exception Correctlly!") + }.getMessage + + assert( + message.contains("NULL not allowed for column \"TIME_STAMP\""), + "It is not allowed to insert into null into timestamp column which is defined not null." + ) } } From 3b1785cae43c1d027687f5518c33bd7d743d002b Mon Sep 17 00:00:00 2001 From: "YIHAODIAN\\wangshuangshuang" Date: Wed, 28 Jun 2017 11:06:24 +0800 Subject: [PATCH 03/11] Spark-19726: modify unit tests tablename same with jdbc --- .../org/apache/spark/sql/sources/JdbcInsertSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala index 42cb82929f073..956e2485940cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala @@ -53,7 +53,7 @@ class JdbcInsertSuite extends DataSourceTest with BeforeAndAfter with SharedSQLC sql( s""" - |CREATE OR REPLACE TEMPORARY VIEW jdbcTable + |CREATE OR REPLACE TEMPORARY VIEW timestamp_test |USING org.apache.spark.sql.jdbc |OPTIONS (url '$url', dbtable 'test.timestamp_test', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) @@ -74,13 +74,13 @@ class JdbcInsertSuite extends DataSourceTest with BeforeAndAfter with SharedSQLC val message = intercept[Exception] { sql( s""" - |INSERT INTO jdbcTable values(222, null) + |INSERT INTO timestamp_test values(111, null) """.stripMargin) }.getMessage assert( message.contains("NULL not allowed for column \"TIME_STAMP\""), - "It is not allowed to insert into null into timestamp column which is defined not null." + "It is not allowed to insert null into timestamp column which is defined not null." ) } } From 60b007d18ef09c170ddca17d5567bf30b5b8b993 Mon Sep 17 00:00:00 2001 From: "YIHAODIAN\\wangshuangshuang" Date: Wed, 28 Jun 2017 13:07:04 +0800 Subject: [PATCH 04/11] Spark-19726: add a new boolean parameter alwaysNullable to getSchema --- .../execution/datasources/jdbc/JDBCRDD.scala | 8 +- .../datasources/jdbc/JdbcUtils.scala | 5 +- .../spark/sql/jdbc/JDBCWriteSuite.scala | 11 ++- .../spark/sql/sources/JdbcInsertSuite.scala | 87 ------------------- 4 files changed, 14 insertions(+), 97 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 4241a98543a43..2bdc43254133e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -61,13 +61,7 @@ object JDBCRDD extends Logging { try { val rs = statement.executeQuery() try { - val metaStructType = JdbcUtils.getSchema(rs, dialect) - StructType(metaStructType.map(f => - if(f.nullable) - f - else - StructField(f.name, f.dataType, true, f.metadata) - )) + JdbcUtils.getSchema(rs, dialect) } finally { rs.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index ca61c2efe2ddf..b705d1e2dcc3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -273,6 +273,9 @@ object JdbcUtils extends Logging { val rsmd = resultSet.getMetaData val ncols = rsmd.getColumnCount val fields = new Array[StructField](ncols) + // if true, spark will propagate null to underlying DB engine instead of using type default value + val alwaysNullable = true + var i = 0 while (i < ncols) { val columnName = rsmd.getColumnLabel(i + 1) @@ -290,7 +293,7 @@ object JdbcUtils extends Logging { rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true } } - val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls + val nullable = if (alwaysNullable) alwaysNullable else rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls val metadata = new MetadataBuilder() .putString("name", columnName) .putLong("scale", fieldScale) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index bf1fd160704fa..169aaf6c1c6f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.jdbc import java.sql.{Date, DriverManager, Timestamp} import java.util.Properties -import scala.collection.JavaConverters.propertiesAsScalaMapConverter +import org.apache.spark.SparkException +import scala.collection.JavaConverters.propertiesAsScalaMapConverter import org.scalatest.BeforeAndAfter - import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} @@ -506,4 +506,11 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { "schema struct")) } } + + test("SPARK-19726: INSERT null to a NOT NULL column") { + val e = intercept[SparkException] { + sql("INSERT INTO PEOPLE1 values (null, null)") + }.getMessage + assert(e.contains("NULL not allowed for column \"NAME\"")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala deleted file mode 100644 index 956e2485940cb..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/JdbcInsertSuite.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import java.io.File -import java.sql.DriverManager -import java.util.Properties - -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.{AnalysisException, Row} -import org.apache.spark.util.Utils -import org.scalatest.BeforeAndAfter - -class JdbcInsertSuite extends DataSourceTest with BeforeAndAfter with SharedSQLContext { - import testImplicits._ - - val url = "jdbc:h2:mem:testdb0" - val urlWithUserAndPass = "jdbc:h2:mem:testdb0;user=testUser;password=testPass" - var conn: java.sql.Connection = null - - - protected override lazy val sql = spark.sql _ - - before { - Utils.classForName("org.h2.Driver") - val properties = new Properties() - properties.setProperty("user", "testUser") - properties.setProperty("password", "testPass") - properties.setProperty("rowId", "false") - - conn = DriverManager.getConnection(url, properties) - conn.prepareStatement("create schema test").executeUpdate() - conn.prepareStatement( - "create table test.timestamp_test (id bigint(11) DEFAULT NULL, time_stamp TIMESTAMP NOT NULL)"). - executeUpdate() - - conn.commit() - - sql( - s""" - |CREATE OR REPLACE TEMPORARY VIEW timestamp_test - |USING org.apache.spark.sql.jdbc - |OPTIONS (url '$url', dbtable 'test.timestamp_test', user 'testUser', password 'testPass') - """.stripMargin.replaceAll("\n", " ")) - } - - after { - spark.catalog.dropTempView("jdbcTable") - - conn.prepareStatement("drop table test.timestamp_test").executeUpdate() - conn.prepareStatement("drop schema test").executeUpdate() - - conn.commit() - conn.close() - } - - test("SPARK-19726 - Faild to insert null timestamp value to mysql using spark jdbc") { - - val message = intercept[Exception] { - sql( - s""" - |INSERT INTO timestamp_test values(111, null) - """.stripMargin) - }.getMessage - - assert( - message.contains("NULL not allowed for column \"TIME_STAMP\""), - "It is not allowed to insert null into timestamp column which is defined not null." - ) - } -} - From d9526f1aa6f31bd802905023fcbd01c669b58b86 Mon Sep 17 00:00:00 2001 From: "YIHAODIAN\\wangshuangshuang" Date: Wed, 28 Jun 2017 13:23:05 +0800 Subject: [PATCH 05/11] Spark-19726: fix scala style checks error --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index b705d1e2dcc3e..8c39654e37b56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -273,7 +273,8 @@ object JdbcUtils extends Logging { val rsmd = resultSet.getMetaData val ncols = rsmd.getColumnCount val fields = new Array[StructField](ncols) - // if true, spark will propagate null to underlying DB engine instead of using type default value + // if true, spark will propagate null to underlying + // DB engine instead of using type default value val alwaysNullable = true var i = 0 @@ -293,7 +294,11 @@ object JdbcUtils extends Logging { rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true } } - val nullable = if (alwaysNullable) alwaysNullable else rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls + val nullable = + if (alwaysNullable) + alwaysNullable + else + rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls val metadata = new MetadataBuilder() .putString("name", columnName) .putLong("scale", fieldScale) From dde91236ecf54d9ede76bf888dda1499c7cd225f Mon Sep 17 00:00:00 2001 From: "YIHAODIAN\\wangshuangshuang" Date: Wed, 28 Jun 2017 13:35:37 +0800 Subject: [PATCH 06/11] Spark-19726: fix scala style checks error --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 8c39654e37b56..a3e351c70dd86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -294,11 +294,12 @@ object JdbcUtils extends Logging { rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true } } - val nullable = + val nullable = { if (alwaysNullable) alwaysNullable else rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls + } val metadata = new MetadataBuilder() .putString("name", columnName) .putLong("scale", fieldScale) From 02eeaffe0f94907df8f3a575ad1f00928b146b6d Mon Sep 17 00:00:00 2001 From: "YIHAODIAN\\wangshuangshuang" Date: Wed, 28 Jun 2017 14:11:37 +0800 Subject: [PATCH 07/11] Spark-19726: alwaysNullable as a parameter for getSchema --- .../execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../datasources/jdbc/JdbcUtils.scala | 30 +++++++++---------- .../spark/sql/jdbc/JDBCWriteSuite.scala | 5 ++-- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 2bdc43254133e..f04a3adbe88ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -61,7 +61,7 @@ object JDBCRDD extends Logging { try { val rs = statement.executeQuery() try { - JdbcUtils.getSchema(rs, dialect) + JdbcUtils.getSchema(rs, dialect, true) } finally { rs.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index a3e351c70dd86..a2acf70b87a57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -264,19 +264,19 @@ object JdbcUtils extends Logging { } /** - * Takes a [[ResultSet]] and returns its Catalyst schema. - * - * @return A [[StructType]] giving the Catalyst schema. - * @throws SQLException if the schema contains an unsupported type. - */ - def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = { + * Takes a [[ResultSet]] and returns its Catalyst schema. + * + * @param alwaysNullable - If true, spark will propagate null + * to underlying DB engine instead of using type default value + * @return A [[StructType]] giving the Catalyst schema. + * @throws SQLException if the schema contains an unsupported type. + */ + def getSchema(resultSet: ResultSet, + dialect: JdbcDialect, + alwaysNullable: Boolean = false): StructType = { val rsmd = resultSet.getMetaData val ncols = rsmd.getColumnCount val fields = new Array[StructField](ncols) - // if true, spark will propagate null to underlying - // DB engine instead of using type default value - val alwaysNullable = true - var i = 0 while (i < ncols) { val columnName = rsmd.getColumnLabel(i + 1) @@ -294,12 +294,10 @@ object JdbcUtils extends Logging { rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true } } - val nullable = { - if (alwaysNullable) - alwaysNullable - else - rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls - } + val nullable = if (alwaysNullable) + alwaysNullable + else + rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls val metadata = new MetadataBuilder() .putString("name", columnName) .putLong("scale", fieldScale) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 169aaf6c1c6f5..92f50a095f19b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.jdbc import java.sql.{Date, DriverManager, Timestamp} import java.util.Properties -import org.apache.spark.SparkException - import scala.collection.JavaConverters.propertiesAsScalaMapConverter + import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} From b3b5bca416d98b36e41894de1fe89c40dfd1bc48 Mon Sep 17 00:00:00 2001 From: "YIHAODIAN\\wangshuangshuang" Date: Wed, 28 Jun 2017 14:38:06 +0800 Subject: [PATCH 08/11] Spark-19726: fix scala style checks errors according to scala-style-guide --- .../execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../datasources/jdbc/JdbcUtils.scala | 25 ++++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index f04a3adbe88ae..2bdc43254133e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -61,7 +61,7 @@ object JDBCRDD extends Logging { try { val rs = statement.executeQuery() try { - JdbcUtils.getSchema(rs, dialect, true) + JdbcUtils.getSchema(rs, dialect) } finally { rs.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index a2acf70b87a57..f98aec4d5f5e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -264,16 +264,16 @@ object JdbcUtils extends Logging { } /** - * Takes a [[ResultSet]] and returns its Catalyst schema. - * - * @param alwaysNullable - If true, spark will propagate null - * to underlying DB engine instead of using type default value - * @return A [[StructType]] giving the Catalyst schema. - * @throws SQLException if the schema contains an unsupported type. - */ - def getSchema(resultSet: ResultSet, - dialect: JdbcDialect, - alwaysNullable: Boolean = false): StructType = { + * Takes a [[ResultSet]] and returns its Catalyst schema. + * + * @param alwaysNullable If true, all the columns are nullable. + * @return A [[StructType]] giving the Catalyst schema. + * @throws SQLException if the schema contains an unsupported type. + */ + def getSchema( + resultSet: ResultSet, + dialect: JdbcDialect, + alwaysNullable: Boolean = true): StructType = { val rsmd = resultSet.getMetaData val ncols = rsmd.getColumnCount val fields = new Array[StructField](ncols) @@ -294,10 +294,11 @@ object JdbcUtils extends Logging { rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true } } - val nullable = if (alwaysNullable) + val nullable = if (alwaysNullable) { alwaysNullable - else + } else { rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls + } val metadata = new MetadataBuilder() .putString("name", columnName) .putLong("scale", fieldScale) From 637ca0e4f3f69509e1acae1fce1f1af7ec600fc5 Mon Sep 17 00:00:00 2001 From: Shuangshuang Wang Date: Fri, 30 Jun 2017 18:20:35 +0800 Subject: [PATCH 09/11] SPARK-19726: change the value to true in the caller side --- .../apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 2bdc43254133e..f04a3adbe88ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -61,7 +61,7 @@ object JDBCRDD extends Logging { try { val rs = statement.executeQuery() try { - JdbcUtils.getSchema(rs, dialect) + JdbcUtils.getSchema(rs, dialect, true) } finally { rs.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index f98aec4d5f5e1..a4ac8e2b8c2d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -273,7 +273,7 @@ object JdbcUtils extends Logging { def getSchema( resultSet: ResultSet, dialect: JdbcDialect, - alwaysNullable: Boolean = true): StructType = { + alwaysNullable: Boolean = false): StructType = { val rsmd = resultSet.getMetaData val ncols = rsmd.getColumnCount val fields = new Array[StructField](ncols) From 718e9497b060796b46dd0afd00b30ece6adbd188 Mon Sep 17 00:00:00 2001 From: Shuangshuang Wang Date: Mon, 3 Jul 2017 10:18:22 +0800 Subject: [PATCH 10/11] SPARK-19726: true to alwaysNullable equals true in JdbcUtils.getSchema --- .../apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index f04a3adbe88ae..be91c4a535e90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -61,7 +61,7 @@ object JDBCRDD extends Logging { try { val rs = statement.executeQuery() try { - JdbcUtils.getSchema(rs, dialect, true) + JdbcUtils.getSchema(rs, dialect, alwaysNullable = true) } finally { rs.close() } From c071c2f9144672487c15bd624d0d5f47130e1f08 Mon Sep 17 00:00:00 2001 From: Shuangshuang Wang Date: Tue, 4 Jul 2017 16:56:08 +0800 Subject: [PATCH 11/11] SPARK-19726: alwaysNullable and nullable is different --- .../apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index a4ac8e2b8c2d8..55b2539c13381 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -295,7 +295,7 @@ object JdbcUtils extends Logging { } } val nullable = if (alwaysNullable) { - alwaysNullable + true } else { rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls }