Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5f4c26b
scaffolding : basic read/write with jdbc connector
shivsood Jul 16, 2019
e238fc0
first draft implemnentation of write(append) flow with datasourcev2. …
shivsood Jul 19, 2019
ad17856
first draft implementation of dataframe write(append) flow with datas…
shivsood Jul 19, 2019
2ef896a
Readme added with high level project work items and plan
shivsood Jul 19, 2019
7f274d0
Merge branch 'dsv2_jdbc' of https://github.com/shivsood/spark into ds…
shivsood Jul 19, 2019
77448ab
scaffolding : basic read/write with jdbc connector
shivsood Jul 16, 2019
675083f
first draft implemnentation of write(append) flow with datasourcev2. …
shivsood Jul 19, 2019
90e1ad9
Readme added with high level project work items and plan
shivsood Jul 19, 2019
4a4a69d
Merge branch 'dsv2_jdbc' of https://github.com/shivsood/spark into ds…
shivsood Jul 20, 2019
5ddc5e9
hygiene fixes and comments clarifing that read implementation is dumm…
shivsood Jul 20, 2019
57baf76
cleaned up write(append) implementation. Append will not create a tab…
shivsood Jul 22, 2019
7246534
comments/questions on overwrite scematics and minor log fixes
shivsood Jul 22, 2019
c2d2707
first draft of read implementation. No support for partitioning or fi…
shivsood Jul 23, 2019
e15a9e6
scaffolding : basic read/write with jdbc connector
shivsood Jul 16, 2019
99f1a3b
first draft implemnentation of write(append) flow with datasourcev2. …
shivsood Jul 19, 2019
9e4d2d6
Readme added with high level project work items and plan
shivsood Jul 19, 2019
72b949d
scaffolding : basic read/write with jdbc connector
shivsood Jul 16, 2019
f7b5e4d
first draft implemnentation of write(append) flow with datasourcev2. …
shivsood Jul 19, 2019
4a53903
hygiene fixes and comments clarifing that read implementation is dumm…
shivsood Jul 20, 2019
cc7af99
cleaned up write(append) implementation. Append will not create a tab…
shivsood Jul 22, 2019
247c72d
comments/questions on overwrite scematics and minor log fixes
shivsood Jul 22, 2019
508dbc4
first draft of read implementation. No support for partitioning or fi…
shivsood Jul 23, 2019
cf3696a
merge
shivsood Jul 24, 2019
9d4093f
cleaner more stuctured read implementation
shivsood Jul 25, 2019
1446faa
Fixed for OverWrite(with truncate). Test added in MsSqlServerIntegrat…
shivsood Jul 31, 2019
d1e3142
Implementation for Overwrite(truncate=false). Sematics is DROP TABLE …
shivsood Aug 2, 2019
38b80e2
minor readme updates
shivsood Aug 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions examples/src/main/python/sql/datasourcev2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession\
.builder\
.appName("DV2Test")\
.getOrCreate()


df = spark.read.format("csv").options(header='true').load("/home/shivsood/myspark_scripts/text.csv")
df.show(5)

#Read
df_jdbc = spark.read.options(header='true').format("jdbcv2").load()
df_jdbc.show(3)


#Write
#df.filter( col("rollnum") == "38" ).write.format("jdbcv2").save()

Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.spark.sql.jdbc

import java.math.BigDecimal
import java.sql.{Connection, Date, Timestamp}
import java.sql.{Connection, Date, Struct, Timestamp}
import java.util.Properties

import org.apache.spark.tags.DockerTest
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._

@DockerTest
class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
Expand Down Expand Up @@ -99,6 +101,155 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
|'the', 'lazy',
|'dog')
""".stripMargin).executeUpdate()
conn.prepareStatement(
"""
|CREATE TABLE dsv2testTbl (
|i NVarChar(20),
|j INT,
|k NVarChar(20))
""".stripMargin).executeUpdate()
conn.prepareStatement(
"""
|INSERT INTO dsv2testTbl VALUES (
|'The number',
|1,
|'Rocks!')
""".stripMargin).executeUpdate()
}

def create_test_df() : DataFrame = {
val schema:StructType = StructType(
Seq(StructField ("i", IntegerType, true),
StructField ("j", IntegerType, true),
StructField ("k", IntegerType, true))
)
val data:Seq[Row] = Seq(
Row(1,1,2),
Row(1,2,3)
)

spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
}

test("JDBCV2 read test") {
// Read table with JDBC and JDBCV2. Check that same row counts are returned.
val df1 = spark.read
.format("jdbc")
.option("url",jdbcUrl)
.option("dbtable","dsv2testTbl")
.load()
val numberOfRows = df1.count
df1.show(10)

val df2 = spark.read
.format("jdbcv2")
.option("url",jdbcUrl)
.option("dbtable","dsv2testTbl")
.load()
df2.show(10)
// Try column pruning. This fails with error.
//df2.select("i").show(10)
assert(df2.count == numberOfRows)
}

test("JDBCV2 write append test") {
// Read 1 row using JDBC. Write(append) this row using JDBCV2.
logInfo(s"***dsv2-flow-test*** Overwrite(append) can append to table with same schema")
val df1 = spark.read
.format("jdbc")
.option("url",jdbcUrl)
.option("dbtable", "dsv2testTbl")
.load()
df1.show(10)
assert(df1.count == 1)

df1.write.format("jdbcv2")
.mode("append")
.option("url",jdbcUrl)
.option("dbtable", "dsv2testTbl")
.save()
val df2 = spark.read
.format("jdbc")
.option("url",jdbcUrl)
.option("dbtable", "dsv2testTbl")
.load()
df2.show(10)
assert(df2.count == 2)

// Create a df with diffirent schema and append existing table with a diffirent schema. Should fail.
// TODO : Writing a dataframe with diffirent schema should fail. Why is this passing.
logInfo(s"***dsv2-flow-test*** Overwrite(append) fails writing to table with diffirent schema")
val df_new = create_test_df()
df_new.write
.format("jdbcv2")
.mode("append")
.option("url", jdbcUrl)
.option("dbtable", "dsv2testTbl")
.save()
val df2_new = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "dsv2testTbl")
.load()
df2_new.show(10)
assert(df2_new.count == 4)
}

test("JDBCV2 write overwrite(truncate) test") {
// Overwrite with truncate - Check orignal table schema is retined.
val df_ori_before = spark
.read
.format("jdbc")
.option("url",jdbcUrl)
.option("dbtable","dsv2testTbl")
.load()
df_ori_before.show()
val cl_types_orignal = df_ori_before.schema.fields.map(f=>f.dataType)
val t_before = cl_types_orignal.mkString(":")
logInfo(s"***dsv2-flow-test*** Column type before was $t_before")

val df_new = create_test_df()
df_new.write
.format("jdbcv2")
.mode("overwrite")
.option("url",jdbcUrl)
.option("truncate","true")
.option("dbtable","dsv2testTbl")
.save()

val df_ori_after = spark
.read
.format("jdbc")
.option("url",jdbcUrl)
.option("dbtable","dsv2testTbl")
.load()
df_ori_after.show()
val cl_types_after = df_ori_after.schema.fields.map(f=>f.dataType)
val t_after = cl_types_after.mkString(":")
logInfo(s"***dsv2-flow-test*** Column type after is $t_after")

assert(df_ori_after.count == df_ori_before.count)
assert(cl_types_after.deep == cl_types_orignal.deep)
}

import org.apache.spark.sql.functions._
test("JDBCV2 write overwrite test") {
// Overwrite(w/o truncate) a existing table with a new schema and values.
val df1 = create_test_df()
df1.filter(col("j") > lit(1)).write
.format("jdbcv2")
.mode("overwrite")
.option("url",jdbcUrl)
.option("truncate","false")
.option("dbtable","dsv2testTbl")
.save()
val df2 = spark.read
.format("jdbc")
.option("url",jdbcUrl)
.option("dbtable","dsv2testTbl")
.load()
df2.show()
assert(df1.count -1 == df2.count)
}

test("Basic test") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
org.apache.spark.sql.execution.datasources.binaryfile.BinaryFileFormat
org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCDataSourceV2
Original file line number Diff line number Diff line change
Expand Up @@ -729,18 +729,28 @@ object JdbcUtils extends Logging {
}

/**
* Compute the schema string for this RDD.
* Compute the schema string for this RDD given schema as StructType
*/
def schemaString(
df: DataFrame,
url: String,
createTableColumnTypes: Option[String] = None): String = {
val userSpecifiedColTypesMap = createTableColumnTypes
.map(parseUserSpecifiedCreateTableColumnTypes(
df.sparkSession.sessionState.conf.resolver,
df.sparkSession.sessionState.conf.caseSensitiveAnalysis,
df.schema, _)).getOrElse(Map.empty[String, String])

schemaString(df.schema, url, userSpecifiedColTypesMap)
}

def schemaString(
cols: StructType,
url: String,
userSpecifiedColTypesMap : Map[String, String]): String = {
val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
val userSpecifiedColTypesMap = createTableColumnTypes
.map(parseUserSpecifiedCreateTableColumnTypes(df, _))
.getOrElse(Map.empty[String, String])
df.schema.fields.foreach { field =>
cols.fields.foreach { field =>
val name = dialect.quoteIdentifier(field.name)
val typ = userSpecifiedColTypesMap
.getOrElse(field.name, getJdbcType(field.dataType, dialect).databaseTypeDefinition)
Expand All @@ -756,7 +766,9 @@ object JdbcUtils extends Logging {
* use in-place of the default data type.
*/
private def parseUserSpecifiedCreateTableColumnTypes(
df: DataFrame,
resolver : Resolver,
caseType : Boolean,
cols : StructType,
createTableColumnTypes: String): Map[String, String] = {
def typeName(f: StructField): String = {
// char/varchar gets translated to string type. Real data type specified by the user
Expand All @@ -769,24 +781,22 @@ object JdbcUtils extends Logging {
}

val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes)
val nameEquality = df.sparkSession.sessionState.conf.resolver

// checks duplicate columns in the user specified column types.
SchemaUtils.checkColumnNameDuplication(
userSchema.map(_.name), "in the createTableColumnTypes option value", nameEquality)
userSchema.map(_.name), "in the createTableColumnTypes option value", resolver)

// checks if user specified column names exist in the DataFrame schema
userSchema.fieldNames.foreach { col =>
df.schema.find(f => nameEquality(f.name, col)).getOrElse {
cols.find(f => resolver(f.name, col)).getOrElse {
throw new AnalysisException(
s"createTableColumnTypes option column $col not found in schema " +
df.schema.catalogString)
cols.catalogString)
}
}

val userSchemaMap = userSchema.fields.map(f => f.name -> typeName(f)).toMap
val isCaseSensitive = df.sparkSession.sessionState.conf.caseSensitiveAnalysis
if (isCaseSensitive) userSchemaMap else CaseInsensitiveMap(userSchemaMap)
if (caseType) userSchemaMap else CaseInsensitiveMap(userSchemaMap)
}

/**
Expand Down Expand Up @@ -855,6 +865,30 @@ object JdbcUtils extends Logging {
options: JdbcOptionsInWrite): Unit = {
val strSchema = schemaString(
df, options.url, options.createTableColumnTypes)

createTableWithSchemaString(conn, options, strSchema)
}

def createTable(
conn: Connection,
schema : StructType,
options: JdbcOptionsInWrite): Unit = {

val userSpecifiedColTypesMap = options.createTableColumnTypes
.map(parseUserSpecifiedCreateTableColumnTypes(
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution,
true,
schema, _)).getOrElse(Map.empty[String, String])
val strSchema = schemaString(
schema, options.url, userSpecifiedColTypesMap)

createTableWithSchemaString(conn, options, strSchema)
}

def createTableWithSchemaString(
conn : Connection,
options : JdbcOptionsInWrite,
strSchema : String) : Unit = {
val table = options.table
val createTableOptions = options.createTableOptions
// Create the table if the table does not exist.
Expand All @@ -870,4 +904,82 @@ object JdbcUtils extends Logging {
statement.close()
}
}

/**
* Inserts row into the database represented by the connection.
*
* @param conn - JDBCConnection to the given database.
* @param record - InternalRow to be inserted
* @throws IllegalArgumentException if the schema contains an unsupported type.
* Additions to JDBCUtils to support DSV2 JDBC adapter.
* TODO : Refactoring to remove makeIRSetter and use makeSetter with InternalRows
* TODO : Reactor to remove v2 imports/types from this pure DSV1 file.
* TODO : Support for Transactions, Isolation levels
* TODO : Just add to statement here. Commit should be outside of this function.
*/
def saveRow(conn: Connection, record: InternalRow,
options : JdbcOptionsInWrite, schema : StructType) : Unit = {
val dialect = JdbcDialects.get(options.url)
val rddSchema = JdbcUtils.getSchemaOption(conn, options)
val passedSchema = schema

val insertStatement = JdbcUtils.getInsertStatement(options.table,
schema, rddSchema, true, dialect)
val stmt = conn.prepareStatement(insertStatement)
logInfo("***dsv2-flows*** insertStatement is $insertStatement")
val setters = schema.fields.map(f => makeIRSetter(conn, dialect, f.dataType))
val nullTypes = schema.fields.map(f => getJdbcType(f.dataType, dialect).jdbcNullType)
val numFields = schema.fields.length

var colNum = 0
while (colNum < numFields) {
logInfo("***dsv2-flows*** stmt prep field " + colNum)
if (record.isNullAt(colNum)) {
stmt.setNull(colNum + 1, nullTypes(colNum))
} else {
setters(colNum).apply(stmt, record, colNum)
}
colNum = colNum + 1
}
stmt.execute()
}

private type JDBCValueSetterIR = (PreparedStatement, InternalRow, Int) => Unit

private def makeIRSetter(conn: Connection, dialect: JdbcDialect,
dataType: DataType): JDBCValueSetterIR = dataType match {
case IntegerType =>
(stmt: PreparedStatement, row: InternalRow, pos: Int) =>
stmt.setInt(pos + 1, row.getInt(pos))

case LongType =>
(stmt: PreparedStatement, row: InternalRow, pos: Int) =>
stmt.setLong(pos + 1, row.getLong(pos))

case DoubleType =>
(stmt: PreparedStatement, row: InternalRow, pos: Int) =>
stmt.setDouble(pos + 1, row.getDouble(pos))

case FloatType =>
(stmt: PreparedStatement, row: InternalRow, pos: Int) =>
stmt.setFloat(pos + 1, row.getFloat(pos))

case ShortType =>
(stmt: PreparedStatement, row: InternalRow, pos: Int) =>
stmt.setShort(pos + 1, row.getShort(pos))

case ByteType =>
(stmt: PreparedStatement, row: InternalRow, pos: Int) =>
stmt.setByte(pos + 1, row.getByte(pos))

case BooleanType =>
(stmt: PreparedStatement, row: InternalRow, pos: Int) =>
stmt.setBoolean(pos + 1, row.getBoolean(pos))

case StringType =>
(stmt: PreparedStatement, row: InternalRow, pos: Int) =>
stmt.setString(pos + 1, row.getString(pos))
case _ =>
throw new IllegalArgumentException(s"Not supported ${dataType.catalogString}")
}
}
Loading