Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,13 @@ the following case-insensitive options:
The custom schema to use for reading data from JDBC connectors. For example, <code>"id DECIMAL(38, 0), name STRING"</code>. You can also specify partial fields, and the others use the default type mapping. For example, <code>"id DECIMAL(38, 0)"</code>. The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
</td>
</tr>

<tr>
<td><code>pushDownPredicate</code></td>
<td>
The option to enable or disable predicate push-down into the JDBC data source. The default value is true, in which case Spark will push down filters to the JDBC data source as much as possible. Otherwise, if set to false, no filter will be pushed down to the JDBC data source and thus all filters will be handled by Spark. Predicate push-down is usually turned off when the predicate filtering is performed faster by Spark than by the JDBC data source.
</td>
</tr>
</table>

<div class="codetabs">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class JDBCOptions(
}
// An option to execute custom SQL before fetching data from the remote DB
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)

// An option to allow/disallow pushing down predicate into JDBC data source
val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super Nit: Shouldn't it be in plural, pushDownPredicates? There may be many predicates

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or one could argue that "predicate" is a notion of all filters as a whole. It's a nice reminder though. I had not thought about it, but anyway I just checked: we use PushDownPredicate and the singular form in similar rules. So maybe we keep it singular here too?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, consistency is a very good argument :) Indeed it will be plural or not, depending from which side we are looking at it

}

class JdbcOptionsInWrite(
Expand Down Expand Up @@ -234,4 +237,5 @@ object JDBCOptions {
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ private[sql] case class JDBCRelation(

// Check if JDBCRDD.compileFilter can accept input filters
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
filters.filter(JDBCRDD.compileFilter(_, JdbcDialects.get(jdbcOptions.url)).isEmpty)
if (jdbcOptions.pushDownPredicate) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure, that this is the only place? JDBCRDD.scanTable defines filters as all filters that may be pushed down. Probably we should use filters -- unhandledFilters in JdbcRelation.buildScan

Copy link
Contributor Author

@maryannxue maryannxue Jul 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the only source of truth for defining handled/unhandled. The caller (physical rules) calls this method and push "handled" to JdbcRelation.buildScan.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now, my mistake. Thanks for clarification :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I share your opinion actually. It is confusing here... maybe we should change the parameter names at some point.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed it's confusing. buildScan argument may be named pushedFilters, variables also, then code will be self-describing

filters.filter(JDBCRDD.compileFilter(_, JdbcDialects.get(jdbcOptions.url)).isEmpty)
} else {
filters
}
}

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
Expand Down
66 changes: 47 additions & 19 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -261,21 +261,32 @@ class JDBCSuite extends QueryTest
s"Expecting a JDBCRelation with $expectedNumPartitions partitions, but got:`$jdbcRelations`")
}

private def checkPushdown(df: DataFrame): DataFrame = {
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is removed in a physical plan and
// the plan only has PhysicalRDD to scan JDBCRelation.
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
df
}

private def checkNotPushdown(df: DataFrame): DataFrame = {
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
// cannot compile given predicates.
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
df
}

test("SELECT *") {
assert(sql("SELECT * FROM foobar").collect().size === 3)
}

test("SELECT * WHERE (simple predicates)") {
def checkPushdown(df: DataFrame): DataFrame = {
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is removed in a physical plan and
// the plan only has PhysicalRDD to scan JDBCRelation.
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
df
}
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
Expand Down Expand Up @@ -308,15 +319,6 @@ class JDBCSuite extends QueryTest
"WHERE (THEID > 0 AND TRIM(NAME) = 'mary') OR (NAME = 'fred')")
assert(df2.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))

def checkNotPushdown(df: DataFrame): DataFrame = {
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
// cannot compile given predicates.
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
df
}
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2)
}
Expand Down Expand Up @@ -1375,4 +1377,30 @@ class JDBCSuite extends QueryTest
Row("fred", 1) :: Nil)

}

test("SPARK-24288: Enable preventing predicate pushdown") {
val table = "test.people"

val df = spark.read.format("jdbc")
.option("Url", urlWithUserAndPass)
.option("dbTable", table)
.option("pushDownPredicate", false)
.load()
.filter("theid = 1")
.select("name", "theid")
checkAnswer(
checkNotPushdown(df),
Row("fred", 1) :: Nil)

// pushDownPredicate option in the create table path.
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW predicateOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$urlWithUserAndPass', dbTable '$table', pushDownPredicate 'false')
""".stripMargin.replaceAll("\n", " "))
checkAnswer(
checkNotPushdown(sql("SELECT name, theid FROM predicateOption WHERE theid = 1")),
Row("fred", 1) :: Nil)
}
}