Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1302,9 +1302,33 @@ the following case-insensitive options:
<tr>
<td><code>dbtable</code></td>
<td>
The JDBC table that should be read. Note that anything that is valid in a <code>FROM</code> clause of
a SQL query can be used. For example, instead of a full table you could also use a
subquery in parentheses.
The JDBC table that should be read from or written into. Note that when using it in the read
path anything that is valid in a <code>FROM</code> clause of a SQL query can be used.
For example, instead of a full table you could also use a subquery in parentheses. It is not
allowed to specify `dbtable` and `query` options at the same time.
</td>
</tr>
<tr>
<td><code>query</code></td>
<td>
A query that will be used to read data into Spark. The specified query will be parenthesized and used
as a subquery in the <code>FROM</code> clause. Spark will also assign an alias to the subquery clause.
As an example, spark will issue a query of the following form to the JDBC Source.<br><br>
<code> SELECT &lt;columns&gt; FROM (&lt;user_specified_query&gt;) spark_gen_alias</code><br><br>
Below are couple of restrictions while using this option.<br>
<ol>
<li> It is not allowed to specify `dbtable` and `query` options at the same time. </li>
<li> It is not allowed to spcify `query` and `partitionColumn` options at the same time. When specifying
`partitionColumn` option is required, the subquery can be specified using `dbtable` option instead and
partition columns can be qualified using the subquery alias provided as part of `dbtable`. <br>
Example:<br>
<code>
spark.read.format("jdbc")<br>
&nbsp&nbsp .option("dbtable", "(select c1, c2 from t1) as subq")<br>
&nbsp&nbsp .option("partitionColumn", "subq.c1"<br>
&nbsp&nbsp .load()
</code></li>
</ol>
</td>
</tr>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType
* Options for the JDBC data source.
*/
class JDBCOptions(
@transient private val parameters: CaseInsensitiveMap[String])
@transient val parameters: CaseInsensitiveMap[String])
extends Serializable {

import JDBCOptions._
Expand Down Expand Up @@ -65,11 +65,31 @@ class JDBCOptions(
// Required parameters
// ------------------------------------------------------------
require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.")
require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.")
// a JDBC URL
val url = parameters(JDBC_URL)
// name of table
val table = parameters(JDBC_TABLE_NAME)
// table name or a table subquery.
val tableOrQuery = (parameters.get(JDBC_TABLE_NAME), parameters.get(JDBC_QUERY_STRING)) match {
case (Some(name), Some(subquery)) =>
throw new IllegalArgumentException(
s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time."
)
case (None, None) =>
throw new IllegalArgumentException(
s"Option '$JDBC_TABLE_NAME' or '$JDBC_QUERY_STRING' is required."
)
case (Some(name), None) =>
if (name.isEmpty) {
throw new IllegalArgumentException(s"Option '$JDBC_TABLE_NAME' can not be empty.")
} else {
name.trim
}
case (None, Some(subquery)) =>
if (subquery.isEmpty) {
throw new IllegalArgumentException(s"Option `$JDBC_QUERY_STRING` can not be empty.")
} else {
s"(${subquery}) __SPARK_GEN_JDBC_SUBQUERY_NAME_${curId.getAndIncrement()}"
}
}

// ------------------------------------------------------------
// Optional parameters
Expand Down Expand Up @@ -109,6 +129,20 @@ class JDBCOptions(
s"When reading JDBC data sources, users need to specify all or none for the following " +
s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " +
s"and '$JDBC_NUM_PARTITIONS'")

require(!(parameters.get(JDBC_QUERY_STRING).isDefined && partitionColumn.isDefined),
s"""
|Options '$JDBC_QUERY_STRING' and '$JDBC_PARTITION_COLUMN' can not be specified together.
|Please define the query using `$JDBC_TABLE_NAME` option instead and make sure to qualify
|the partition columns using the supplied subquery alias to resolve any ambiguity.
|Example :
|spark.read.format("jdbc")
| .option("dbtable", "(select c1, c2 from t1) as subq")
| .option("partitionColumn", "subq.c1"
| .load()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Please add them to the doc as I mentioned above?

""".stripMargin
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we define these two parameters simultaneously? How about the case where columns in query output have a partition column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu So since the we auto generate a subquery alias here for easy of use, we r disallowing the query option together with partition columns. As users wouldn't know how to qualify the partition columns given the suquery alias is generated implicitly. In this case, we ask them to use the existing dbtable to specify the query where they are in control to specify the alias themselves. Another option i considered is to introduce "queryAlias" as another option. But thought to avoid it for simplicity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu The new option query is just a syntactic sugar for simplifying the work from many basic JDBC users. We can improve it in the future. For example, parsing the user-specified query and make all the other options work.

Copy link
Member

@maropu maropu Jun 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the case below (this can be accepted in dbtable and query is not?):

postgres=# select * from t;
 c0 | c1 | c2
----+----+----
  1 |  1 |  1
  2 |  2 |  2
  3 |  3 |  3
(3 rows)

scala> :paste
val df = spark.read
  .format("jdbc")
  .option("driver", "org.postgresql.Driver")
  .option("url", "jdbc:postgresql://localhost:5432/postgres?user=maropu")
  .option("dbtable", "(select c0 p0, c1 p1, c2 p2 from t where c0 > 1) t")
  .option("partitionColumn", "p2")
  .option("lowerBound", "1")
  .option("upperBound", "3")
  .option("numPartitions", "2")
  .load()

scala> df.show
+---+---+---+                                                                   
| p0| p1| p2|
+---+---+---+
|  2|  2|  2|
|  3|  3|  3|
+---+---+---+

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal Could you reply @maropu with an example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Currently we disallow it to be on the safe side. Lets take your example. When using the query option to pass on the query, we basically expect the users to supply

select c0 p0, c1 p1, c2 p2 from t where c0 > 1

In spark , we will parentesize the query and add in an alias to confirm to the table subquery syntax. Given the user input the above query, he could decide to qualify the partition column names with the table name. So he could do the following :

al df = spark.read
  .format("jdbc")
  .option("driver", "org.postgresql.Driver")
  .option("url", "jdbc:postgresql://localhost:5432/postgres?user=maropu")
  .option("query", "select c0 p0, c1 p1, c2 p2 from t where c0 > 1")
  .option("partitionColumn", "t.p2")  ==> User qualifies the column names.
  .option("lowerBound", "1")
  .option("upperBound", "3")
  .option("numPartitions", "2")
  .load()

In this case we will end up generating the query of the following form -

select * from (select c0 p0, c1 p1, c2 p2 from t where c0 > 1) __SPARK_GEN_ALIAS where t.p2 >= 1 and t.p2 <=3

However this would be an invalid query. In the query option, its possible to specify a complex query involving joins.

Thats the reason, we disallow it to be in safe side. In the dbtable option, users are responsible to explicitly specify the alias and would now how to qualify the partition columns.

Lets see if we can improve this in future. If you have some ideas, please let us know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha, ok. Thanks for the kind explanation.


val fetchSize = {
val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
require(size >= 0,
Expand Down Expand Up @@ -149,7 +183,30 @@ class JDBCOptions(
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
}

class JdbcOptionsInWrite(
@transient override val parameters: CaseInsensitiveMap[String])
extends JDBCOptions(parameters) {

import JDBCOptions._

def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

def this(url: String, table: String, parameters: Map[String, String]) = {
this(CaseInsensitiveMap(parameters ++ Map(
JDBCOptions.JDBC_URL -> url,
JDBCOptions.JDBC_TABLE_NAME -> table)))
}

require(
parameters.get(JDBC_TABLE_NAME).isDefined,
s"Option '$JDBC_TABLE_NAME' is required. " +
s"Option '$JDBC_QUERY_STRING' is not applicable while writing.")

val table = parameters(JDBC_TABLE_NAME)
}

object JDBCOptions {
private val curId = new java.util.concurrent.atomic.AtomicLong(0L)
private val jdbcOptionNames = collection.mutable.Set[String]()

private def newOption(name: String): String = {
Expand All @@ -159,6 +216,7 @@ object JDBCOptions {

val JDBC_URL = newOption("url")
val JDBC_TABLE_NAME = newOption("dbtable")
val JDBC_QUERY_STRING = newOption("query")
val JDBC_DRIVER_CLASS = newOption("driver")
val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
val JDBC_LOWER_BOUND = newOption("lowerBound")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object JDBCRDD extends Logging {
*/
def resolveTable(options: JDBCOptions): StructType = {
val url = options.url
val table = options.table
val table = options.tableOrQuery
val dialect = JdbcDialects.get(url)
val conn: Connection = JdbcUtils.createConnectionFactory(options)()
try {
Expand Down Expand Up @@ -296,7 +296,7 @@ private[jdbc] class JDBCRDD(

val myWhereClause = getWhereClause(part)

val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setFetchSize(options.fetchSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ private[sql] case class JDBCRelation(
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
data.write
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
.jdbc(jdbcOptions.url, jdbcOptions.table, jdbcOptions.asProperties)
.jdbc(jdbcOptions.url, jdbcOptions.tableOrQuery, jdbcOptions.asProperties)
}

override def toString: String = {
val partitioningInfo = if (parts.nonEmpty) s" [numPartitions=${parts.length}]" else ""
// credentials should not be included in the plan output, table information is sufficient.
s"JDBCRelation(${jdbcOptions.table})" + partitioningInfo
s"JDBCRelation(${jdbcOptions.tableOrQuery})" + partitioningInfo
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class JdbcRelationProvider extends CreatableRelationProvider
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation = {
val options = new JDBCOptions(parameters)
val options = new JdbcOptionsInWrite(parameters)
val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis

val conn = JdbcUtils.createConnectionFactory(options)()
Expand All @@ -86,7 +86,8 @@ class JdbcRelationProvider extends CreatableRelationProvider

case SaveMode.ErrorIfExists =>
throw new AnalysisException(
s"Table or view '${options.table}' already exists. SaveMode: ErrorIfExists.")
s"Table or view '${options.table}' already exists. " +
s"SaveMode: ErrorIfExists.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No change, right?


case SaveMode.Ignore =>
// With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object JdbcUtils extends Logging {
/**
* Returns true if the table already exists in the JDBC database.
*/
def tableExists(conn: Connection, options: JDBCOptions): Boolean = {
def tableExists(conn: Connection, options: JdbcOptionsInWrite): Boolean = {
val dialect = JdbcDialects.get(options.url)

// Somewhat hacky, but there isn't a good way to identify whether a table exists for all
Expand Down Expand Up @@ -100,7 +100,7 @@ object JdbcUtils extends Logging {
/**
* Truncates a table from the JDBC database without side effects.
*/
def truncateTable(conn: Connection, options: JDBCOptions): Unit = {
def truncateTable(conn: Connection, options: JdbcOptionsInWrite): Unit = {
val dialect = JdbcDialects.get(options.url)
val statement = conn.createStatement
try {
Expand Down Expand Up @@ -255,7 +255,7 @@ object JdbcUtils extends Logging {
val dialect = JdbcDialects.get(options.url)

try {
val statement = conn.prepareStatement(dialect.getSchemaQuery(options.table))
val statement = conn.prepareStatement(dialect.getSchemaQuery(options.tableOrQuery))
try {
statement.setQueryTimeout(options.queryTimeout)
Some(getSchema(statement.executeQuery(), dialect))
Expand Down Expand Up @@ -809,7 +809,7 @@ object JdbcUtils extends Logging {
df: DataFrame,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
options: JDBCOptions): Unit = {
options: JdbcOptionsInWrite): Unit = {
val url = options.url
val table = options.table
val dialect = JdbcDialects.get(url)
Expand Down Expand Up @@ -838,7 +838,7 @@ object JdbcUtils extends Logging {
def createTable(
conn: Connection,
df: DataFrame,
options: JDBCOptions): Unit = {
options: JdbcOptionsInWrite): Unit = {
val strSchema = schemaString(
df, options.url, options.createTableColumnTypes)
val table = options.table
Expand Down
94 changes: 91 additions & 3 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.h2.jdbc.JdbcSQLException
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.DataSourceScanExec
Expand All @@ -39,7 +39,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

class JDBCSuite extends SparkFunSuite
class JDBCSuite extends QueryTest
with BeforeAndAfter with PrivateMethodTester with SharedSQLContext {
import testImplicits._

Expand Down Expand Up @@ -1099,7 +1099,7 @@ class JDBCSuite extends SparkFunSuite
test("SPARK-19318: Connection properties keys should be case-sensitive.") {
def testJdbcOptions(options: JDBCOptions): Unit = {
// Spark JDBC data source options are case-insensitive
assert(options.table == "t1")
assert(options.tableOrQuery == "t1")
// When we convert it to properties, it should be case-sensitive.
assert(options.asProperties.size == 3)
assert(options.asProperties.get("customkey") == null)
Expand Down Expand Up @@ -1255,4 +1255,92 @@ class JDBCSuite extends SparkFunSuite
testIncorrectJdbcPartitionColumn(testH2Dialect.quoteIdentifier("ThEiD"))
}
}

test("query JDBC option - negative tests") {
val query = "SELECT * FROM test.people WHERE theid = 1"
// load path
val e1 = intercept[RuntimeException] {
val df = spark.read.format("jdbc")
.option("Url", urlWithUserAndPass)
.option("query", query)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks.. I will update the doc.

.option("dbtable", "test.people")
.load()
}.getMessage
assert(e1.contains("Both 'dbtable' and 'query' can not be specified at the same time."))

// jdbc api path
val properties = new Properties()
properties.setProperty(JDBCOptions.JDBC_QUERY_STRING, query)
val e2 = intercept[RuntimeException] {
spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", properties).collect()
}.getMessage
assert(e2.contains("Both 'dbtable' and 'query' can not be specified at the same time."))

val e3 = intercept[RuntimeException] {
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', query '$query', dbtable 'TEST.PEOPLE',
| user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
}.getMessage
assert(e3.contains("Both 'dbtable' and 'query' can not be specified at the same time."))

val e4 = intercept[RuntimeException] {
val df = spark.read.format("jdbc")
.option("Url", urlWithUserAndPass)
.option("query", "")
.load()
}.getMessage
assert(e4.contains("Option `query` can not be empty."))

// Option query and partitioncolumn are not allowed together.
val expectedErrorMsg =
s"""
|Options 'query' and 'partitionColumn' can not be specified together.
|Please define the query using `dbtable` option instead and make sure to qualify
|the partition columns using the supplied subquery alias to resolve any ambiguity.
|Example :
|spark.read.format("jdbc")
| .option("dbtable", "(select c1, c2 from t1) as subq")
| .option("partitionColumn", "subq.c1"
| .load()
""".stripMargin
val e5 = intercept[RuntimeException] {
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', query '$query', user 'testUser', password 'testPass',
| partitionColumn 'THEID', lowerBound '1', upperBound '4', numPartitions '3')
""".stripMargin.replaceAll("\n", " "))
}.getMessage
assert(e5.contains(expectedErrorMsg))
}

test("query JDBC option") {
val query = "SELECT name, theid FROM test.people WHERE theid = 1"
// query option to pass on the query string.
val df = spark.read.format("jdbc")
.option("Url", urlWithUserAndPass)
.option("query", query)
.load()
checkAnswer(
df,
Row("fred", 1) :: Nil)

// query option in the create table path.
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', query '$query', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))

checkAnswer(
sql("select name, theid from queryOption"),
Row("fred", 1) :: Nil)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,23 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
test("save errors if dbtable is not specified") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)

val e = intercept[RuntimeException] {
val e1 = intercept[RuntimeException] {
df.write.format("jdbc")
.option("url", url1)
.options(properties.asScala)
.save()
}.getMessage
assert(e1.contains("Option 'dbtable' or 'query' is required"))

val e2 = intercept[RuntimeException] {
df.write.format("jdbc")
.option("url", url1)
.options(properties.asScala)
.option("query", "select * from TEST.SAVETEST")
.save()
}.getMessage
assert(e.contains("Option 'dbtable' is required"))
val msg = "Option 'dbtable' is required. Option 'query' is not applicable while writing."
assert(e2.contains(msg))
}

test("save errors if wrong user/password combination") {
Expand Down