diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index e815e5bd516e2..4b013c633e27c 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1435,6 +1435,13 @@ the following case-insensitive options:
The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING". You can also specify partial fields, and the others use the default type mapping. For example, "id DECIMAL(38, 0)". The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
+
+
+ pushDownPredicate |
+
+ The option to enable or disable predicate push-down into the JDBC data source. The default value is true, in which case Spark will push down filters to the JDBC data source as much as possible. Otherwise, if set to false, no filter will be pushed down to the JDBC data source and thus all filters will be handled by Spark. Predicate push-down is usually turned off when the predicate filtering is performed faster by Spark than by the JDBC data source.
+ |
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 574aed4958fd7..d80efcedf8c2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -183,6 +183,9 @@ class JDBCOptions(
}
// An option to execute custom SQL before fetching data from the remote DB
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
+
+ // An option to allow/disallow pushing down predicate into JDBC data source
+ val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean
}
class JdbcOptionsInWrite(
@@ -234,4 +237,5 @@ object JDBCOptions {
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
+ val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 97e2d255cb7be..4f78f593fa4af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -172,7 +172,11 @@ private[sql] case class JDBCRelation(
// Check if JDBCRDD.compileFilter can accept input filters
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
- filters.filter(JDBCRDD.compileFilter(_, JdbcDialects.get(jdbcOptions.url)).isEmpty)
+ if (jdbcOptions.pushDownPredicate) {
+ filters.filter(JDBCRDD.compileFilter(_, JdbcDialects.get(jdbcOptions.url)).isEmpty)
+ } else {
+ filters
+ }
}
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 09facb9bef8dc..0edbd3a55e17e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -261,21 +261,32 @@ class JDBCSuite extends QueryTest
s"Expecting a JDBCRelation with $expectedNumPartitions partitions, but got:`$jdbcRelations`")
}
+ private def checkPushdown(df: DataFrame): DataFrame = {
+ val parentPlan = df.queryExecution.executedPlan
+ // Check if SparkPlan Filter is removed in a physical plan and
+ // the plan only has PhysicalRDD to scan JDBCRelation.
+ assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+ val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
+ assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
+ assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
+ df
+ }
+
+ private def checkNotPushdown(df: DataFrame): DataFrame = {
+ val parentPlan = df.queryExecution.executedPlan
+ // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
+ // cannot compile given predicates.
+ assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+ val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
+ assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
+ df
+ }
+
test("SELECT *") {
assert(sql("SELECT * FROM foobar").collect().size === 3)
}
test("SELECT * WHERE (simple predicates)") {
- def checkPushdown(df: DataFrame): DataFrame = {
- val parentPlan = df.queryExecution.executedPlan
- // Check if SparkPlan Filter is removed in a physical plan and
- // the plan only has PhysicalRDD to scan JDBCRelation.
- assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
- val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
- assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
- assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
- df
- }
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
@@ -308,15 +319,6 @@ class JDBCSuite extends QueryTest
"WHERE (THEID > 0 AND TRIM(NAME) = 'mary') OR (NAME = 'fred')")
assert(df2.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))
- def checkNotPushdown(df: DataFrame): DataFrame = {
- val parentPlan = df.queryExecution.executedPlan
- // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
- // cannot compile given predicates.
- assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
- val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
- assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
- df
- }
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2)
}
@@ -1375,4 +1377,30 @@ class JDBCSuite extends QueryTest
Row("fred", 1) :: Nil)
}
+
+ test("SPARK-24288: Enable preventing predicate pushdown") {
+ val table = "test.people"
+
+ val df = spark.read.format("jdbc")
+ .option("Url", urlWithUserAndPass)
+ .option("dbTable", table)
+ .option("pushDownPredicate", false)
+ .load()
+ .filter("theid = 1")
+ .select("name", "theid")
+ checkAnswer(
+ checkNotPushdown(df),
+ Row("fred", 1) :: Nil)
+
+ // pushDownPredicate option in the create table path.
+ sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW predicateOption
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$urlWithUserAndPass', dbTable '$table', pushDownPredicate 'false')
+ """.stripMargin.replaceAll("\n", " "))
+ checkAnswer(
+ checkNotPushdown(sql("SELECT name, theid FROM predicateOption WHERE theid = 1")),
+ Row("fred", 1) :: Nil)
+ }
}