From 0380bbfc5f7d3f5c8e15446ad284b69f90bbdfff Mon Sep 17 00:00:00 2001 From: sureshthalamati Date: Fri, 2 Dec 2016 15:22:17 -0800 Subject: [PATCH 1/5] Adding new option to the jdbc to allow users to specify create table column types when table is created on write --- docs/sql-programming-guide.md | 7 +++ .../sql/JavaSQLDataSourceExample.java | 9 ++++ examples/src/main/python/sql/datasource.py | 6 +++ .../examples/sql/SQLDataSourceExample.scala | 7 +++ .../datasources/jdbc/JDBCOptions.scala | 2 + .../datasources/jdbc/JdbcUtils.scala | 16 ++++-- .../spark/sql/jdbc/JDBCWriteSuite.scala | 53 ++++++++++++++++++- 7 files changed, 96 insertions(+), 4 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index b077575155eb..6deb214ed431 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1223,6 +1223,13 @@ the following case-insensitive options: This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing. + + + createTableColumnTypes + + The database column data types to use instead of the defaults, when creating the table. Data type information should be specified as key(column name)-value(data type) pairs in JSON (e.g: {"name":"varchar(128)", "comments":"clob(20k)"}). You can use org.apache.spark.sql.types.MetadataBuilder to build the metadata and generate the JSON string required for this option. This option applies only to writing. + +
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index 82bb284ea3e5..21c05ec6604e 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -35,6 +35,9 @@ // $example off:schema_merging$ // $example off:basic_parquet_example$ import org.apache.spark.sql.SparkSession; +// $example on:jdbc_dataset$ +import org.apache.spark.sql.types.MetadataBuilder; +// $example off:jdbc_dataset$ public class JavaSQLDataSourceExample { @@ -258,6 +261,12 @@ private static void runJdbcDatasetExample(SparkSession spark) { jdbcDF2.write() .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); + + // Specifying create table column data types on write + String columnTypes = new MetadataBuilder().putString("name", "VARCHAR(128)").build().json(); + jdbcDF.write() + .option("createTableColumnTypes", columnTypes) + .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // $example off:jdbc_dataset$ } } diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index e9aa9d9ac258..1a29f80ad255 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -169,6 +169,12 @@ def jdbc_dataset_example(spark): jdbcDF2.write \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) + + # Specifying create table column data types on write + jdbcDF.write \ + .option("createTableColumnTypes", "{\"name\":\"VARCHAR(128)\"}") \ + .jdbc("jdbc:postgresql:dbserver", "schema.tablename", + properties={"user": "username", "password": "password"}) # $example off:jdbc_dataset$ diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index 381e69cda841..ea026ff7b71b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -19,6 +19,7 @@ package org.apache.spark.examples.sql import java.util.Properties import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.MetadataBuilder object SQLDataSourceExample { @@ -181,6 +182,12 @@ object SQLDataSourceExample { jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) + + // Specifying create table column data types on write + val createTableColTypes = new MetadataBuilder().putString("name", "VARCHAR(128)").build().json + jdbcDF.write + .option("createTableColumnTypes", createTableColTypes) + .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // $example off:jdbc_dataset$ } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index d4d34646545b..89fe86c038b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -119,6 +119,7 @@ class JDBCOptions( // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8" // TODO: to reuse the existing partition parameters for those partition specific options val createTableOptions = parameters.getOrElse(JDBC_CREATE_TABLE_OPTIONS, "") + val createTableColumnTypes = parameters.get(JDBC_CREATE_TABLE_COLUMN_TYPES) val batchSize = { val size = parameters.getOrElse(JDBC_BATCH_INSERT_SIZE, "1000").toInt require(size >= 1, @@ -154,6 +155,7 @@ object JDBCOptions { val JDBC_BATCH_FETCH_SIZE = newOption("fetchsize") val JDBC_TRUNCATE = newOption("truncate") val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions") + val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes") val JDBC_BATCH_INSERT_SIZE = newOption("batchsize") val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index d89f60087417..ebe3f6e5b515 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -680,12 +680,21 @@ object JdbcUtils extends Logging { /** * Compute the schema string for this RDD. */ - def schemaString(schema: StructType, url: String): String = { + def schemaString( + schema: StructType, + url: String, + createTableColumnTypes: Option[String] = None): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) + val dbColumnTypeMetadata = + createTableColumnTypes.map(Metadata.fromJson).getOrElse(Metadata.empty) schema.fields foreach { field => val name = dialect.quoteIdentifier(field.name) - val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition + val typ: String = if (dbColumnTypeMetadata.contains(field.name)) { + dbColumnTypeMetadata.getString(field.name) + } else { + getJdbcType(field.dataType, dialect).databaseTypeDefinition + } val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") } @@ -728,9 +737,10 @@ object JdbcUtils extends Logging { conn: Connection, schema: StructType, options: JDBCOptions): Unit = { - val strSchema = schemaString(schema, options.url) val table = options.table val createTableOptions = options.createTableOptions + val createTableColumnTypes = options.createTableColumnTypes + val strSchema = schemaString(schema, options.url, createTableColumnTypes) // Create the table if the table does not exist. // To allow certain options to append when create a new table, which can be // table_options or partition_options. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index ec7b19e666ec..3d0bd1188095 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters.propertiesAsScalaMapConverter import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{AnalysisException, Row, SaveMode} -import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -362,4 +362,55 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(sql("select * from people_view").count() == 2) } } + + test("SPARK-10849: create table using user specified column type.") { + val data = Seq[Row]( + Row(1, "dave", "Boston", "electric cars"), + Row(2, "mary", "boston", "building planes") + ) + val schema = StructType( + StructField("id", IntegerType) :: + StructField("name", StringType) :: + StructField("city", StringType) :: + StructField("descr", StringType) :: + Nil) + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) + + // Using metadata builder to generate metadata json string for create table column types. + val mdb = new MetadataBuilder() + mdb.putString("name", "NVARCHAR(123)") + // Use H2 varchar_ignorecase type instead of TEXT to perform case-insensitive comparisions + mdb.putString("city", "VARCHAR_IGNORECASE(20)") + val createTableColTypes = mdb.build().json + assert(JdbcUtils.schemaString(df.schema, url1, Option(createTableColTypes)) == + s""""id" INTEGER , "name" NVARCHAR(123) , "city" VARCHAR_IGNORECASE(20) , "descr" TEXT """) + // create the table with the user specified data types, and verify the data + df.write.option("createTableColumnTypes", createTableColTypes) + .jdbc(url1, "TEST.DBCOLTYPETEST", properties) + assert(spark.read.jdbc(url1, + """(select * from TEST.DBCOLTYPETEST where "city"='Boston')""", properties).count == 2) + } + + test("SPARK-10849: jdbcCreateTableColumnTypes option with invalid data type") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val invalidCreateTableColTypes = + new MetadataBuilder().putString("name", "INVALID(123)").build().json + val msg = intercept[org.h2.jdbc.JdbcSQLException] { + df.write.mode(SaveMode.Overwrite) + .option("createTableColumnTypes", invalidCreateTableColTypes) + .jdbc(url1, "TEST.USERDBTYPETEST", properties) + }.getMessage() + assert(msg.contains("Unknown data type: \"INVALID\"")) + } + + test("SPARK-10849: jdbcCreateTableColumnTypes option with invalid json string") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val msg = intercept[com.fasterxml.jackson.core.JsonParseException] { + df.write.mode(SaveMode.Overwrite) + .option("createTableColumnTypes", """{"name":"NVARCHAR(12)"""") + .jdbc(url1, "TEST.USERDBTYPETEST", properties) + }.getMessage() + assert( + msg.contains("expected close marker for OBJECT (from [Source: {\"name\":\"NVARCHAR(12)\";")) + } } From 6d1b4f8d5c2e46e58ee20ca34e6afe299c22567b Mon Sep 17 00:00:00 2001 From: sureshthalamati Date: Wed, 7 Dec 2016 23:56:10 -0800 Subject: [PATCH 2/5] fix for python style check error --- examples/src/main/python/sql/datasource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index 1a29f80ad255..3c5f04fc1ef2 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -174,7 +174,7 @@ def jdbc_dataset_example(spark): jdbcDF.write \ .option("createTableColumnTypes", "{\"name\":\"VARCHAR(128)\"}") \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", - properties={"user": "username", "password": "password"}) + properties={"user": "username", "password": "password"}) # $example off:jdbc_dataset$ From 64eb5058f4acb4616fd5f1033b04f22630641d75 Mon Sep 17 00:00:00 2001 From: sureshthalamati Date: Mon, 20 Mar 2017 18:46:07 -0700 Subject: [PATCH 3/5] Changing the createTableColumnTypes option value format to DDL schema format --- docs/sql-programming-guide.md | 2 +- .../sql/JavaSQLDataSourceExample.java | 6 +- examples/src/main/python/sql/datasource.py | 2 +- .../examples/sql/SQLDataSourceExample.scala | 4 +- .../datasources/jdbc/JdbcUtils.scala | 53 +++++++++++--- .../spark/sql/jdbc/JDBCWriteSuite.scala | 70 +++++++++++++------ 6 files changed, 96 insertions(+), 41 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 6deb214ed431..7ae9847983d4 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1227,7 +1227,7 @@ the following case-insensitive options: createTableColumnTypes - The database column data types to use instead of the defaults, when creating the table. Data type information should be specified as key(column name)-value(data type) pairs in JSON (e.g: {"name":"varchar(128)", "comments":"clob(20k)"}). You can use org.apache.spark.sql.types.MetadataBuilder to build the metadata and generate the JSON string required for this option. This option applies only to writing. + The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing. diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index 21c05ec6604e..1a7054614b34 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -35,9 +35,6 @@ // $example off:schema_merging$ // $example off:basic_parquet_example$ import org.apache.spark.sql.SparkSession; -// $example on:jdbc_dataset$ -import org.apache.spark.sql.types.MetadataBuilder; -// $example off:jdbc_dataset$ public class JavaSQLDataSourceExample { @@ -263,9 +260,8 @@ private static void runJdbcDatasetExample(SparkSession spark) { .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // Specifying create table column data types on write - String columnTypes = new MetadataBuilder().putString("name", "VARCHAR(128)").build().json(); jdbcDF.write() - .option("createTableColumnTypes", columnTypes) + .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // $example off:jdbc_dataset$ } diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index 3c5f04fc1ef2..e4abb0933345 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -172,7 +172,7 @@ def jdbc_dataset_example(spark): # Specifying create table column data types on write jdbcDF.write \ - .option("createTableColumnTypes", "{\"name\":\"VARCHAR(128)\"}") \ + .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # $example off:jdbc_dataset$ diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index ea026ff7b71b..82fd56de3984 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -19,7 +19,6 @@ package org.apache.spark.examples.sql import java.util.Properties import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.types.MetadataBuilder object SQLDataSourceExample { @@ -184,9 +183,8 @@ object SQLDataSourceExample { .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying create table column data types on write - val createTableColTypes = new MetadataBuilder().putString("name", "VARCHAR(128)").build().json jdbcDF.write - .option("createTableColumnTypes", createTableColTypes) + .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // $example off:jdbc_dataset$ } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index ebe3f6e5b515..8b4952b464b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData} import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType} import org.apache.spark.sql.types._ @@ -686,21 +687,56 @@ object JdbcUtils extends Logging { createTableColumnTypes: Option[String] = None): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) - val dbColumnTypeMetadata = - createTableColumnTypes.map(Metadata.fromJson).getOrElse(Metadata.empty) + val userSpecifiedColTypesMap = createTableColumnTypes + .map(parseUserSpecifiedCreateTableColumnTypes(schema, _)) + .getOrElse(Map.empty[String, String]) schema.fields foreach { field => val name = dialect.quoteIdentifier(field.name) - val typ: String = if (dbColumnTypeMetadata.contains(field.name)) { - dbColumnTypeMetadata.getString(field.name) - } else { - getJdbcType(field.dataType, dialect).databaseTypeDefinition - } + val typ: String = userSpecifiedColTypesMap.get(field.name) + .getOrElse(getJdbcType(field.dataType, dialect).databaseTypeDefinition) val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") } if (sb.length < 2) "" else sb.substring(2) } + /** + * Parses the user specified createTableColumnTypes option value string specified in the same + * format as create table ddl column types, and returns Map of field name and the data type to + * use in-place of the default data type. + */ + private def parseUserSpecifiedCreateTableColumnTypes(schema: StructType, + createTableColumnTypes: String): Map[String, String] = { + val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes) + val userColNames = userSchema.fieldNames + // check duplicate columns in the user specified column types. + if (userColNames.distinct.length != userColNames.length) { + val duplicates = userColNames.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => x + }.mkString(", ") + throw new AnalysisException( + s"Found duplicate column(s) in createTableColumnTypes option value: $duplicates") + } + // check user specified column names exists in the data frame schema. + val commonNames = userColNames.intersect(schema.fieldNames) + if (commonNames.length != userColNames.length) { + val invalidColumns = userColNames.diff(commonNames).mkString(", ") + throw new AnalysisException( + s"Found invalid column(s) in createTableColumnTypes option value: $invalidColumns") + } + + // char/varchar gets translated to string type. Real data type specified by the user + // is available in the field metadata as HIVE_TYPE_STRING + userSchema.fields.map(f => + f.name -> { + (if (f.metadata.contains(HIVE_TYPE_STRING)) { + f.metadata.getString(HIVE_TYPE_STRING) + } else { + f.dataType.catalogString + }).toUpperCase + }).toMap + } + /** * Saves the RDD to the database in a single transaction. */ @@ -737,10 +773,9 @@ object JdbcUtils extends Logging { conn: Connection, schema: StructType, options: JDBCOptions): Unit = { + val strSchema = schemaString(schema, options.url, options.createTableColumnTypes) val table = options.table val createTableOptions = options.createTableOptions - val createTableColumnTypes = options.createTableColumnTypes - val strSchema = schemaString(schema, options.url, createTableColumnTypes) // Create the table if the table does not exist. // To allow certain options to append when create a new table, which can be // table_options or partition_options. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 3d0bd1188095..9a0de7f4b3e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters.propertiesAsScalaMapConverter import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{AnalysisException, Row, SaveMode} +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -366,51 +367,76 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { test("SPARK-10849: create table using user specified column type.") { val data = Seq[Row]( Row(1, "dave", "Boston", "electric cars"), - Row(2, "mary", "boston", "building planes") + Row(2, "mary", "Seattle", "building planes") ) val schema = StructType( StructField("id", IntegerType) :: - StructField("name", StringType) :: + StructField("first#name", StringType) :: StructField("city", StringType) :: StructField("descr", StringType) :: Nil) val df = spark.createDataFrame(sparkContext.parallelize(data), schema) - - // Using metadata builder to generate metadata json string for create table column types. - val mdb = new MetadataBuilder() - mdb.putString("name", "NVARCHAR(123)") - // Use H2 varchar_ignorecase type instead of TEXT to perform case-insensitive comparisions - mdb.putString("city", "VARCHAR_IGNORECASE(20)") - val createTableColTypes = mdb.build().json + // Use database specific CHAR/VARCHAR types instead of String data type. + val createTableColTypes = "`first#name` VARCHAR(123), city CHAR(20)" assert(JdbcUtils.schemaString(df.schema, url1, Option(createTableColTypes)) == - s""""id" INTEGER , "name" NVARCHAR(123) , "city" VARCHAR_IGNORECASE(20) , "descr" TEXT """) + s""""id" INTEGER , "first#name" VARCHAR(123) , "city" CHAR(20) , "descr" TEXT """) + // create the table with the user specified data types, and verify the data df.write.option("createTableColumnTypes", createTableColTypes) .jdbc(url1, "TEST.DBCOLTYPETEST", properties) assert(spark.read.jdbc(url1, - """(select * from TEST.DBCOLTYPETEST where "city"='Boston')""", properties).count == 2) + """(select * from TEST.DBCOLTYPETEST where "city" ='Boston')""", properties).count() == 1) + + // verify the data types on the target table + val rows = spark.read.jdbc(url1, + """ + |(select type_name, CHARACTER_MAXIMUM_LENGTH + | from information_schema.COLUMNS where table_name = 'DBCOLTYPETEST') + """.stripMargin.replaceAll("\n", " "), + properties + ) + assert(rows.where("TYPE_NAME='VARCHAR'").head.getInt(1) == 123) + assert(rows.where("TYPE_NAME='CHAR'").head.getInt(1) == 20) } - test("SPARK-10849: jdbcCreateTableColumnTypes option with invalid data type") { + test("SPARK-10849: jdbc CreateTableColumnTypes option with invalid data type") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - val invalidCreateTableColTypes = - new MetadataBuilder().putString("name", "INVALID(123)").build().json - val msg = intercept[org.h2.jdbc.JdbcSQLException] { + val msg = intercept[ParseException] { df.write.mode(SaveMode.Overwrite) - .option("createTableColumnTypes", invalidCreateTableColTypes) + .option("createTableColumnTypes", "name CLOB(2000)") .jdbc(url1, "TEST.USERDBTYPETEST", properties) }.getMessage() - assert(msg.contains("Unknown data type: \"INVALID\"")) + assert(msg.contains("DataType clob(2000) is not supported.")) } - test("SPARK-10849: jdbcCreateTableColumnTypes option with invalid json string") { + test("SPARK-10849: jdbc CreateTableColumnTypes option with invalid syntax") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - val msg = intercept[com.fasterxml.jackson.core.JsonParseException] { + val msg = intercept[ParseException] { df.write.mode(SaveMode.Overwrite) - .option("createTableColumnTypes", """{"name":"NVARCHAR(12)"""") + .option("createTableColumnTypes", "`name char(20)") .jdbc(url1, "TEST.USERDBTYPETEST", properties) }.getMessage() - assert( - msg.contains("expected close marker for OBJECT (from [Source: {\"name\":\"NVARCHAR(12)\";")) + assert(msg.contains("no viable alternative at input")) + } + + test("SPARK-10849: jdbc CreateTableColumnTypes duplicate columns") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val msg = intercept[AnalysisException] { + df.write.mode(SaveMode.Overwrite) + .option("createTableColumnTypes", "name CHAR(20), id int, name VARCHAR(100)") + .jdbc(url1, "TEST.USERDBTYPETEST", properties) + }.getMessage() + assert(msg.contains("Found duplicate column(s) in createTableColumnTypes option value: name")) + } + + test("SPARK-10849: jdbc CreateTableColumnTypes invalid columns") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val msg = intercept[AnalysisException] { + df.write.mode(SaveMode.Overwrite) + .option("createTableColumnTypes", "firstName CHAR(20), id int, lastName VARCHAR(100)") + .jdbc(url1, "TEST.USERDBTYPETEST", properties) + }.getMessage() + assert(msg.contains( + "Found invalid column(s) in createTableColumnTypes option value: firstName, lastName")) } } From d93d3fdde19d5fea88946951db99b242371c907e Mon Sep 17 00:00:00 2001 From: sureshthalamati Date: Wed, 22 Mar 2017 15:24:26 -0700 Subject: [PATCH 4/5] Added case-sensitive handling to the user specied columnTypes string. Addressed review comments. --- .../jdbc/JdbcRelationProvider.scala | 4 +- .../datasources/jdbc/JdbcUtils.scala | 75 +++++---- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- .../spark/sql/jdbc/JDBCWriteSuite.scala | 159 +++++++++++++----- 4 files changed, 158 insertions(+), 82 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 88f6cb002130..74dcfb06f5c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -69,7 +69,7 @@ class JdbcRelationProvider extends CreatableRelationProvider } else { // Otherwise, do not truncate the table, instead drop and recreate it dropTable(conn, options.table) - createTable(conn, df.schema, options) + createTable(conn, df, options) saveTable(df, Some(df.schema), isCaseSensitive, options) } @@ -87,7 +87,7 @@ class JdbcRelationProvider extends CreatableRelationProvider // Therefore, it is okay to do nothing here and then just return the relation below. } } else { - createTable(conn, df.schema, options) + createTable(conn, df, options) saveTable(df, Some(df.schema), isCaseSensitive, options) } } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 8b4952b464b5..60ec1cba5965 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -682,18 +682,18 @@ object JdbcUtils extends Logging { * Compute the schema string for this RDD. */ def schemaString( - schema: StructType, + df: DataFrame, url: String, createTableColumnTypes: Option[String] = None): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) val userSpecifiedColTypesMap = createTableColumnTypes - .map(parseUserSpecifiedCreateTableColumnTypes(schema, _)) + .map(parseUserSpecifiedCreateTableColumnTypes(df, _)) .getOrElse(Map.empty[String, String]) - schema.fields foreach { field => + df.schema.fields.foreach { field => val name = dialect.quoteIdentifier(field.name) - val typ: String = userSpecifiedColTypesMap.get(field.name) - .getOrElse(getJdbcType(field.dataType, dialect).databaseTypeDefinition) + val typ = userSpecifiedColTypesMap + .getOrElse(field.name, getJdbcType(field.dataType, dialect).databaseTypeDefinition) val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") } @@ -705,36 +705,44 @@ object JdbcUtils extends Logging { * format as create table ddl column types, and returns Map of field name and the data type to * use in-place of the default data type. */ - private def parseUserSpecifiedCreateTableColumnTypes(schema: StructType, + private def parseUserSpecifiedCreateTableColumnTypes( + df: DataFrame, createTableColumnTypes: String): Map[String, String] = { + def typeName(f: StructField): String = { + // char/varchar gets translated to string type. Real data type specified by the user + // is available in the field metadata as HIVE_TYPE_STRING + if (f.metadata.contains(HIVE_TYPE_STRING)) { + f.metadata.getString(HIVE_TYPE_STRING) + } else { + f.dataType.catalogString + } + } + val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes) - val userColNames = userSchema.fieldNames - // check duplicate columns in the user specified column types. - if (userColNames.distinct.length != userColNames.length) { - val duplicates = userColNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => x - }.mkString(", ") - throw new AnalysisException( - s"Found duplicate column(s) in createTableColumnTypes option value: $duplicates") + val nameEquality = df.sparkSession.sessionState.conf.resolver + + // checks duplicate columns in the user specified column types. + userSchema.fieldNames.foreach { col => + val duplicatesCols = userSchema.fieldNames.filter(nameEquality(_, col)) + if (duplicatesCols.size >= 2) { + throw new AnalysisException( + "Found duplicate column(s) in createTableColumnTypes option value: " + + duplicatesCols.mkString(", ")) + } } - // check user specified column names exists in the data frame schema. - val commonNames = userColNames.intersect(schema.fieldNames) - if (commonNames.length != userColNames.length) { - val invalidColumns = userColNames.diff(commonNames).mkString(", ") - throw new AnalysisException( - s"Found invalid column(s) in createTableColumnTypes option value: $invalidColumns") + + // checks if user specified column names exist in the DataFrame schema + userSchema.fieldNames.foreach { col => + df.schema.find(f => nameEquality(f.name, col)).getOrElse { + throw new AnalysisException( + s"createTableColumnTypes option column $col not found in schema " + + df.schema.catalogString) + } } - // char/varchar gets translated to string type. Real data type specified by the user - // is available in the field metadata as HIVE_TYPE_STRING - userSchema.fields.map(f => - f.name -> { - (if (f.metadata.contains(HIVE_TYPE_STRING)) { - f.metadata.getString(HIVE_TYPE_STRING) - } else { - f.dataType.catalogString - }).toUpperCase - }).toMap + val userSchemaMap = userSchema.fields.map(f => f.name -> typeName(f)).toMap + val isCaseSensitive = df.sparkSession.sessionState.conf.caseSensitiveAnalysis + if (isCaseSensitive) userSchemaMap else CaseInsensitiveMap(userSchemaMap) } /** @@ -771,9 +779,10 @@ object JdbcUtils extends Logging { */ def createTable( conn: Connection, - schema: StructType, + df: DataFrame, options: JDBCOptions): Unit = { - val strSchema = schemaString(schema, options.url, options.createTableColumnTypes) + val strSchema = schemaString( + df, options.url, options.createTableColumnTypes) val table = options.table val createTableOptions = options.createTableOptions // Create the table if the table does not exist. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 5463728ca0c1..4a02277631f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -869,7 +869,7 @@ class JDBCSuite extends SparkFunSuite test("SPARK-16387: Reserved SQL words are not escaped by JDBC writer") { val df = spark.createDataset(Seq("a", "b", "c")).toDF("order") - val schema = JdbcUtils.schemaString(df.schema, "jdbc:mysql://localhost:3306/temp") + val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp") assert(schema.contains("`order` TEXT")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 9a0de7f4b3e3..57860e04b17d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.jdbc -import java.sql.DriverManager +import java.sql.{Date, DriverManager, Timestamp} import java.util.Properties import scala.collection.JavaConverters.propertiesAsScalaMapConverter import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{AnalysisException, Row, SaveMode} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.internal.SQLConf @@ -364,39 +364,89 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { } } - test("SPARK-10849: create table using user specified column type.") { - val data = Seq[Row]( - Row(1, "dave", "Boston", "electric cars"), - Row(2, "mary", "Seattle", "building planes") - ) + test("SPARK-10849: test schemaString - from createTableColumnTypes option values") { + def testCreateTableColDataTypes(types: Seq[String]): Unit = { + val colTypes = types.zipWithIndex.map { case (t, i) => (s"col$i", t) } + val schema = colTypes + .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) + val createTableColTypes = + colTypes.map { case (col, dataType) => s"$col $dataType" }.mkString(", ") + val df = spark.createDataFrame(sparkContext.parallelize(Seq(Row.empty)), schema) + + val expectedSchemaStr = + colTypes.map { case (col, dataType) => s""""$col" $dataType """ }.mkString(", ") + + assert(JdbcUtils.schemaString(df, url1, Option(createTableColTypes)) == expectedSchemaStr) + } + + testCreateTableColDataTypes(Seq("boolean")) + testCreateTableColDataTypes(Seq("tinyint", "smallint", "int", "bigint")) + testCreateTableColDataTypes(Seq("float", "double")) + testCreateTableColDataTypes(Seq("string", "char(10)", "varchar(20)")) + testCreateTableColDataTypes(Seq("decimal(10,0)", "decimal(10,5)")) + testCreateTableColDataTypes(Seq("date", "timestamp")) + testCreateTableColDataTypes(Seq("binary")) + } + + test("SPARK-10849: create table using user specified column type and verify on target table") { + def testUserSpecifiedColTypes( + df: DataFrame, + createTableColTypes: String, + expectedTypes: Map[String, String]): Unit = { + df.write + .mode(SaveMode.Overwrite) + .option("createTableColumnTypes", createTableColTypes) + .jdbc(url1, "TEST.DBCOLTYPETEST", properties) + + // verify the data types of the created table by reading the database catalog of H2 + val query = + """ + |(SELECT column_name, type_name, character_maximum_length + | FROM information_schema.columns WHERE table_name = 'DBCOLTYPETEST') + """.stripMargin + val rows = spark.read.jdbc(url1, query, properties).collect() + + rows.foreach { row => + val typeName = row.getString(1) + // For CHAR and VARCHAR, we also compare the max length + if (typeName.contains("CHAR")) { + val charMaxLength = row.getInt(2) + assert(expectedTypes(row.getString(0)) == s"$typeName($charMaxLength)") + } else { + assert(expectedTypes(row.getString(0)) == typeName) + } + } + } + + val data = Seq[Row](Row(1, "dave", "Boston", "electric cars")) val schema = StructType( StructField("id", IntegerType) :: StructField("first#name", StringType) :: - StructField("city", StringType) :: - StructField("descr", StringType) :: - Nil) + StructField("city", StringType) :: Nil) val df = spark.createDataFrame(sparkContext.parallelize(data), schema) - // Use database specific CHAR/VARCHAR types instead of String data type. - val createTableColTypes = "`first#name` VARCHAR(123), city CHAR(20)" - assert(JdbcUtils.schemaString(df.schema, url1, Option(createTableColTypes)) == - s""""id" INTEGER , "first#name" VARCHAR(123) , "city" CHAR(20) , "descr" TEXT """) - - // create the table with the user specified data types, and verify the data - df.write.option("createTableColumnTypes", createTableColTypes) - .jdbc(url1, "TEST.DBCOLTYPETEST", properties) - assert(spark.read.jdbc(url1, - """(select * from TEST.DBCOLTYPETEST where "city" ='Boston')""", properties).count() == 1) - - // verify the data types on the target table - val rows = spark.read.jdbc(url1, - """ - |(select type_name, CHARACTER_MAXIMUM_LENGTH - | from information_schema.COLUMNS where table_name = 'DBCOLTYPETEST') - """.stripMargin.replaceAll("\n", " "), - properties - ) - assert(rows.where("TYPE_NAME='VARCHAR'").head.getInt(1) == 123) - assert(rows.where("TYPE_NAME='CHAR'").head.getInt(1) == 20) + + // out-of-order + val expected1 = Map("id" -> "BIGINT", "first#name" -> "VARCHAR(123)", "city" -> "CHAR(20)") + testUserSpecifiedColTypes(df, "`first#name` VARCHAR(123), id BIGINT, city CHAR(20)", expected1) + // partial schema + val expected2 = Map("id" -> "INTEGER", "first#name" -> "VARCHAR(123)", "city" -> "CHAR(20)") + testUserSpecifiedColTypes(df, "`first#name` VARCHAR(123), city CHAR(20)", expected2) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + // should still respect the original column names + val expected = Map("id" -> "INTEGER", "first#name" -> "VARCHAR(123)", "city" -> "CLOB") + testUserSpecifiedColTypes(df, "`FiRsT#NaMe` VARCHAR(123)", expected) + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val schema = StructType( + StructField("id", IntegerType) :: + StructField("First#Name", StringType) :: + StructField("city", StringType) :: Nil) + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) + val expected = Map("id" -> "INTEGER", "First#Name" -> "VARCHAR(123)", "city" -> "CLOB") + testUserSpecifiedColTypes(df, "`First#Name` VARCHAR(123)", expected) + } } test("SPARK-10849: jdbc CreateTableColumnTypes option with invalid data type") { @@ -413,30 +463,47 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) val msg = intercept[ParseException] { df.write.mode(SaveMode.Overwrite) - .option("createTableColumnTypes", "`name char(20)") + .option("createTableColumnTypes", "`name char(20)") // incorrectly quoted column .jdbc(url1, "TEST.USERDBTYPETEST", properties) }.getMessage() assert(msg.contains("no viable alternative at input")) } test("SPARK-10849: jdbc CreateTableColumnTypes duplicate columns") { - val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - val msg = intercept[AnalysisException] { - df.write.mode(SaveMode.Overwrite) - .option("createTableColumnTypes", "name CHAR(20), id int, name VARCHAR(100)") - .jdbc(url1, "TEST.USERDBTYPETEST", properties) - }.getMessage() - assert(msg.contains("Found duplicate column(s) in createTableColumnTypes option value: name")) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val msg = intercept[AnalysisException] { + df.write.mode(SaveMode.Overwrite) + .option("createTableColumnTypes", "name CHAR(20), id int, NaMe VARCHAR(100)") + .jdbc(url1, "TEST.USERDBTYPETEST", properties) + }.getMessage() + assert(msg.contains( + "Found duplicate column(s) in createTableColumnTypes option value: name, NaMe")) + } } test("SPARK-10849: jdbc CreateTableColumnTypes invalid columns") { + // schema2 has the column "id" and "name" val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - val msg = intercept[AnalysisException] { - df.write.mode(SaveMode.Overwrite) - .option("createTableColumnTypes", "firstName CHAR(20), id int, lastName VARCHAR(100)") - .jdbc(url1, "TEST.USERDBTYPETEST", properties) - }.getMessage() - assert(msg.contains( - "Found invalid column(s) in createTableColumnTypes option value: firstName, lastName")) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val msg = intercept[AnalysisException] { + df.write.mode(SaveMode.Overwrite) + .option("createTableColumnTypes", "firstName CHAR(20), id int") + .jdbc(url1, "TEST.USERDBTYPETEST", properties) + }.getMessage() + assert(msg.contains("createTableColumnTypes option column firstName not found in " + + "schema struct")) + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val msg = intercept[AnalysisException] { + df.write.mode(SaveMode.Overwrite) + .option("createTableColumnTypes", "id int, Name VARCHAR(100)") + .jdbc(url1, "TEST.USERDBTYPETEST", properties) + }.getMessage() + assert(msg.contains("createTableColumnTypes option column Name not found in " + + "schema struct")) + } } } From 6f51d3ffbabc641098ad14a1649cda1839fda54a Mon Sep 17 00:00:00 2001 From: sureshthalamati Date: Thu, 23 Mar 2017 10:19:31 -0700 Subject: [PATCH 5/5] Addressing review comments. Fixed indendation and removed unused value --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 4 ++-- .../scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 60ec1cba5965..774d1ba19432 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -706,8 +706,8 @@ object JdbcUtils extends Logging { * use in-place of the default data type. */ private def parseUserSpecifiedCreateTableColumnTypes( - df: DataFrame, - createTableColumnTypes: String): Map[String, String] = { + df: DataFrame, + createTableColumnTypes: String): Map[String, String] = { def typeName(f: StructField): String = { // char/varchar gets translated to string type. Real data type specified by the user // is available in the field metadata as HIVE_TYPE_STRING diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 57860e04b17d..bf1fd160704f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -390,9 +390,9 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { test("SPARK-10849: create table using user specified column type and verify on target table") { def testUserSpecifiedColTypes( - df: DataFrame, - createTableColTypes: String, - expectedTypes: Map[String, String]): Unit = { + df: DataFrame, + createTableColTypes: String, + expectedTypes: Map[String, String]): Unit = { df.write .mode(SaveMode.Overwrite) .option("createTableColumnTypes", createTableColTypes) @@ -418,7 +418,7 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { } } - val data = Seq[Row](Row(1, "dave", "Boston", "electric cars")) + val data = Seq[Row](Row(1, "dave", "Boston")) val schema = StructType( StructField("id", IntegerType) :: StructField("first#name", StringType) ::