diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 196b814420be..7c4ef41cc890 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1302,9 +1302,33 @@ the following case-insensitive options:
dbtable |
- The JDBC table that should be read. Note that anything that is valid in a FROM clause of
- a SQL query can be used. For example, instead of a full table you could also use a
- subquery in parentheses.
+ The JDBC table that should be read from or written into. Note that when using it in the read
+ path anything that is valid in a FROM clause of a SQL query can be used.
+ For example, instead of a full table you could also use a subquery in parentheses. It is not
+ allowed to specify `dbtable` and `query` options at the same time.
+ |
+
+
+ query |
+
+ A query that will be used to read data into Spark. The specified query will be parenthesized and used
+ as a subquery in the FROM clause. Spark will also assign an alias to the subquery clause.
+ As an example, spark will issue a query of the following form to the JDBC Source.
+ SELECT <columns> FROM (<user_specified_query>) spark_gen_alias
+ Below are couple of restrictions while using this option.
+
+ - It is not allowed to specify `dbtable` and `query` options at the same time.
+ - It is not allowed to spcify `query` and `partitionColumn` options at the same time. When specifying
+ `partitionColumn` option is required, the subquery can be specified using `dbtable` option instead and
+ partition columns can be qualified using the subquery alias provided as part of `dbtable`.
+ Example:
+
+ spark.read.format("jdbc")
+    .option("dbtable", "(select c1, c2 from t1) as subq")
+    .option("partitionColumn", "subq.c1"
+    .load()
+
+
|
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index a73a97c06fe5..eea966d30948 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType
* Options for the JDBC data source.
*/
class JDBCOptions(
- @transient private val parameters: CaseInsensitiveMap[String])
+ @transient val parameters: CaseInsensitiveMap[String])
extends Serializable {
import JDBCOptions._
@@ -65,11 +65,31 @@ class JDBCOptions(
// Required parameters
// ------------------------------------------------------------
require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.")
- require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.")
// a JDBC URL
val url = parameters(JDBC_URL)
- // name of table
- val table = parameters(JDBC_TABLE_NAME)
+ // table name or a table subquery.
+ val tableOrQuery = (parameters.get(JDBC_TABLE_NAME), parameters.get(JDBC_QUERY_STRING)) match {
+ case (Some(name), Some(subquery)) =>
+ throw new IllegalArgumentException(
+ s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time."
+ )
+ case (None, None) =>
+ throw new IllegalArgumentException(
+ s"Option '$JDBC_TABLE_NAME' or '$JDBC_QUERY_STRING' is required."
+ )
+ case (Some(name), None) =>
+ if (name.isEmpty) {
+ throw new IllegalArgumentException(s"Option '$JDBC_TABLE_NAME' can not be empty.")
+ } else {
+ name.trim
+ }
+ case (None, Some(subquery)) =>
+ if (subquery.isEmpty) {
+ throw new IllegalArgumentException(s"Option `$JDBC_QUERY_STRING` can not be empty.")
+ } else {
+ s"(${subquery}) __SPARK_GEN_JDBC_SUBQUERY_NAME_${curId.getAndIncrement()}"
+ }
+ }
// ------------------------------------------------------------
// Optional parameters
@@ -109,6 +129,20 @@ class JDBCOptions(
s"When reading JDBC data sources, users need to specify all or none for the following " +
s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " +
s"and '$JDBC_NUM_PARTITIONS'")
+
+ require(!(parameters.get(JDBC_QUERY_STRING).isDefined && partitionColumn.isDefined),
+ s"""
+ |Options '$JDBC_QUERY_STRING' and '$JDBC_PARTITION_COLUMN' can not be specified together.
+ |Please define the query using `$JDBC_TABLE_NAME` option instead and make sure to qualify
+ |the partition columns using the supplied subquery alias to resolve any ambiguity.
+ |Example :
+ |spark.read.format("jdbc")
+ | .option("dbtable", "(select c1, c2 from t1) as subq")
+ | .option("partitionColumn", "subq.c1"
+ | .load()
+ """.stripMargin
+ )
+
val fetchSize = {
val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
require(size >= 0,
@@ -149,7 +183,30 @@ class JDBCOptions(
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
}
+class JdbcOptionsInWrite(
+ @transient override val parameters: CaseInsensitiveMap[String])
+ extends JDBCOptions(parameters) {
+
+ import JDBCOptions._
+
+ def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+
+ def this(url: String, table: String, parameters: Map[String, String]) = {
+ this(CaseInsensitiveMap(parameters ++ Map(
+ JDBCOptions.JDBC_URL -> url,
+ JDBCOptions.JDBC_TABLE_NAME -> table)))
+ }
+
+ require(
+ parameters.get(JDBC_TABLE_NAME).isDefined,
+ s"Option '$JDBC_TABLE_NAME' is required. " +
+ s"Option '$JDBC_QUERY_STRING' is not applicable while writing.")
+
+ val table = parameters(JDBC_TABLE_NAME)
+}
+
object JDBCOptions {
+ private val curId = new java.util.concurrent.atomic.AtomicLong(0L)
private val jdbcOptionNames = collection.mutable.Set[String]()
private def newOption(name: String): String = {
@@ -159,6 +216,7 @@ object JDBCOptions {
val JDBC_URL = newOption("url")
val JDBC_TABLE_NAME = newOption("dbtable")
+ val JDBC_QUERY_STRING = newOption("query")
val JDBC_DRIVER_CLASS = newOption("driver")
val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
val JDBC_LOWER_BOUND = newOption("lowerBound")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 0bab3689e5d0..1b3b17c75e75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -51,7 +51,7 @@ object JDBCRDD extends Logging {
*/
def resolveTable(options: JDBCOptions): StructType = {
val url = options.url
- val table = options.table
+ val table = options.tableOrQuery
val dialect = JdbcDialects.get(url)
val conn: Connection = JdbcUtils.createConnectionFactory(options)()
try {
@@ -296,7 +296,7 @@ private[jdbc] class JDBCRDD(
val myWhereClause = getWhereClause(part)
- val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
+ val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setFetchSize(options.fetchSize)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index b84543ccd786..97e2d255cb7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -189,12 +189,12 @@ private[sql] case class JDBCRelation(
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
data.write
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
- .jdbc(jdbcOptions.url, jdbcOptions.table, jdbcOptions.asProperties)
+ .jdbc(jdbcOptions.url, jdbcOptions.tableOrQuery, jdbcOptions.asProperties)
}
override def toString: String = {
val partitioningInfo = if (parts.nonEmpty) s" [numPartitions=${parts.length}]" else ""
// credentials should not be included in the plan output, table information is sufficient.
- s"JDBCRelation(${jdbcOptions.table})" + partitioningInfo
+ s"JDBCRelation(${jdbcOptions.tableOrQuery})" + partitioningInfo
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index 2b488bb7121d..782d626c1573 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -59,7 +59,7 @@ class JdbcRelationProvider extends CreatableRelationProvider
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation = {
- val options = new JDBCOptions(parameters)
+ val options = new JdbcOptionsInWrite(parameters)
val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
val conn = JdbcUtils.createConnectionFactory(options)()
@@ -86,7 +86,8 @@ class JdbcRelationProvider extends CreatableRelationProvider
case SaveMode.ErrorIfExists =>
throw new AnalysisException(
- s"Table or view '${options.table}' already exists. SaveMode: ErrorIfExists.")
+ s"Table or view '${options.table}' already exists. " +
+ s"SaveMode: ErrorIfExists.")
case SaveMode.Ignore =>
// With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 433443007cfd..b81737eda475 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -67,7 +67,7 @@ object JdbcUtils extends Logging {
/**
* Returns true if the table already exists in the JDBC database.
*/
- def tableExists(conn: Connection, options: JDBCOptions): Boolean = {
+ def tableExists(conn: Connection, options: JdbcOptionsInWrite): Boolean = {
val dialect = JdbcDialects.get(options.url)
// Somewhat hacky, but there isn't a good way to identify whether a table exists for all
@@ -100,7 +100,7 @@ object JdbcUtils extends Logging {
/**
* Truncates a table from the JDBC database without side effects.
*/
- def truncateTable(conn: Connection, options: JDBCOptions): Unit = {
+ def truncateTable(conn: Connection, options: JdbcOptionsInWrite): Unit = {
val dialect = JdbcDialects.get(options.url)
val statement = conn.createStatement
try {
@@ -255,7 +255,7 @@ object JdbcUtils extends Logging {
val dialect = JdbcDialects.get(options.url)
try {
- val statement = conn.prepareStatement(dialect.getSchemaQuery(options.table))
+ val statement = conn.prepareStatement(dialect.getSchemaQuery(options.tableOrQuery))
try {
statement.setQueryTimeout(options.queryTimeout)
Some(getSchema(statement.executeQuery(), dialect))
@@ -809,7 +809,7 @@ object JdbcUtils extends Logging {
df: DataFrame,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
- options: JDBCOptions): Unit = {
+ options: JdbcOptionsInWrite): Unit = {
val url = options.url
val table = options.table
val dialect = JdbcDialects.get(url)
@@ -838,7 +838,7 @@ object JdbcUtils extends Logging {
def createTable(
conn: Connection,
df: DataFrame,
- options: JDBCOptions): Unit = {
+ options: JdbcOptionsInWrite): Unit = {
val strSchema = schemaString(
df, options.url, options.createTableColumnTypes)
val table = options.table
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 6ea61f02a820..0389273d6cdf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -25,7 +25,7 @@ import org.h2.jdbc.JdbcSQLException
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.{SparkException, SparkFunSuite}
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.DataSourceScanExec
@@ -39,7 +39,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-class JDBCSuite extends SparkFunSuite
+class JDBCSuite extends QueryTest
with BeforeAndAfter with PrivateMethodTester with SharedSQLContext {
import testImplicits._
@@ -1099,7 +1099,7 @@ class JDBCSuite extends SparkFunSuite
test("SPARK-19318: Connection properties keys should be case-sensitive.") {
def testJdbcOptions(options: JDBCOptions): Unit = {
// Spark JDBC data source options are case-insensitive
- assert(options.table == "t1")
+ assert(options.tableOrQuery == "t1")
// When we convert it to properties, it should be case-sensitive.
assert(options.asProperties.size == 3)
assert(options.asProperties.get("customkey") == null)
@@ -1255,4 +1255,92 @@ class JDBCSuite extends SparkFunSuite
testIncorrectJdbcPartitionColumn(testH2Dialect.quoteIdentifier("ThEiD"))
}
}
+
+ test("query JDBC option - negative tests") {
+ val query = "SELECT * FROM test.people WHERE theid = 1"
+ // load path
+ val e1 = intercept[RuntimeException] {
+ val df = spark.read.format("jdbc")
+ .option("Url", urlWithUserAndPass)
+ .option("query", query)
+ .option("dbtable", "test.people")
+ .load()
+ }.getMessage
+ assert(e1.contains("Both 'dbtable' and 'query' can not be specified at the same time."))
+
+ // jdbc api path
+ val properties = new Properties()
+ properties.setProperty(JDBCOptions.JDBC_QUERY_STRING, query)
+ val e2 = intercept[RuntimeException] {
+ spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", properties).collect()
+ }.getMessage
+ assert(e2.contains("Both 'dbtable' and 'query' can not be specified at the same time."))
+
+ val e3 = intercept[RuntimeException] {
+ sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW queryOption
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$url', query '$query', dbtable 'TEST.PEOPLE',
+ | user 'testUser', password 'testPass')
+ """.stripMargin.replaceAll("\n", " "))
+ }.getMessage
+ assert(e3.contains("Both 'dbtable' and 'query' can not be specified at the same time."))
+
+ val e4 = intercept[RuntimeException] {
+ val df = spark.read.format("jdbc")
+ .option("Url", urlWithUserAndPass)
+ .option("query", "")
+ .load()
+ }.getMessage
+ assert(e4.contains("Option `query` can not be empty."))
+
+ // Option query and partitioncolumn are not allowed together.
+ val expectedErrorMsg =
+ s"""
+ |Options 'query' and 'partitionColumn' can not be specified together.
+ |Please define the query using `dbtable` option instead and make sure to qualify
+ |the partition columns using the supplied subquery alias to resolve any ambiguity.
+ |Example :
+ |spark.read.format("jdbc")
+ | .option("dbtable", "(select c1, c2 from t1) as subq")
+ | .option("partitionColumn", "subq.c1"
+ | .load()
+ """.stripMargin
+ val e5 = intercept[RuntimeException] {
+ sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW queryOption
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$url', query '$query', user 'testUser', password 'testPass',
+ | partitionColumn 'THEID', lowerBound '1', upperBound '4', numPartitions '3')
+ """.stripMargin.replaceAll("\n", " "))
+ }.getMessage
+ assert(e5.contains(expectedErrorMsg))
+ }
+
+ test("query JDBC option") {
+ val query = "SELECT name, theid FROM test.people WHERE theid = 1"
+ // query option to pass on the query string.
+ val df = spark.read.format("jdbc")
+ .option("Url", urlWithUserAndPass)
+ .option("query", query)
+ .load()
+ checkAnswer(
+ df,
+ Row("fred", 1) :: Nil)
+
+ // query option in the create table path.
+ sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW queryOption
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$url', query '$query', user 'testUser', password 'testPass')
+ """.stripMargin.replaceAll("\n", " "))
+
+ checkAnswer(
+ sql("select name, theid from queryOption"),
+ Row("fred", 1) :: Nil)
+
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 1c2c92d1f073..b751ec2de482 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -293,13 +293,23 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
test("save errors if dbtable is not specified") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
- val e = intercept[RuntimeException] {
+ val e1 = intercept[RuntimeException] {
+ df.write.format("jdbc")
+ .option("url", url1)
+ .options(properties.asScala)
+ .save()
+ }.getMessage
+ assert(e1.contains("Option 'dbtable' or 'query' is required"))
+
+ val e2 = intercept[RuntimeException] {
df.write.format("jdbc")
.option("url", url1)
.options(properties.asScala)
+ .option("query", "select * from TEST.SAVETEST")
.save()
}.getMessage
- assert(e.contains("Option 'dbtable' is required"))
+ val msg = "Option 'dbtable' is required. Option 'query' is not applicable while writing."
+ assert(e2.contains(msg))
}
test("save errors if wrong user/password combination") {