Skip to content

Commit a48e7bb

Browse files
committed
use a wrapper
1 parent 7e69bfb commit a48e7bb

File tree

2 files changed

+26
-14
lines changed

2 files changed

+26
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpressi
2525
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
28-
import org.apache.spark.sql.connector.read.V1Scan
2928
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
3029
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
3130
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
@@ -55,14 +54,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
5554

5655
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
5756
case PhysicalOperation(project, filters,
58-
relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) =>
59-
val pushedFilters = relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG)
60-
.getOrElse(Seq.empty)
61-
val v1Relation = v1Scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
62-
if (v1Relation.schema != v1Scan.readSchema()) {
57+
relation @ DataSourceV2ScanRelation(_, V1ScanWrapper(scan, translated, pushed), output)) =>
58+
val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
59+
if (v1Relation.schema != scan.readSchema()) {
6360
throw new IllegalArgumentException(
6461
"The fallback v1 relation reports inconsistent schema:\n" +
65-
"Schema of v2 scan: " + v1Scan.readSchema() + "\n" +
62+
"Schema of v2 scan: " + scan.readSchema() + "\n" +
6663
"Schema of v1 relation: " + v1Relation.schema)
6764
}
6865
val rdd = v1Relation.buildScan()
@@ -72,8 +69,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
7269
val dsScan = RowDataSourceScanExec(
7370
output,
7471
requiredColumnsIndex,
75-
pushedFilters.toSet,
76-
pushedFilters.toSet,
72+
translated.toSet,
73+
pushed.toSet,
7774
unsafeRowRDD,
7875
v1Relation,
7976
tableIdentifier = None)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpressi
2121
import org.apache.spark.sql.catalyst.planning.ScanOperation
2222
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
2323
import org.apache.spark.sql.catalyst.rules.Rule
24-
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
24+
import org.apache.spark.sql.connector.read.{Scan, V1Scan}
2525
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
26+
import org.apache.spark.sql.sources
27+
import org.apache.spark.sql.types.StructType
2628

2729
object V2ScanRelationPushDown extends Rule[LogicalPlan] {
2830
import DataSourceV2Implicits._
2931

30-
val PUSHED_FILTERS_TAG = TreeNodeTag[Seq[org.apache.spark.sql.sources.Filter]]("pushed_filters")
31-
3232
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
3333
case ScanOperation(project, filters, relation: DataSourceV2Relation) =>
3434
val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options)
@@ -57,8 +57,14 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
5757
|Output: ${output.mkString(", ")}
5858
""".stripMargin)
5959

60-
val scanRelation = DataSourceV2ScanRelation(relation.table, scan, output)
61-
scanRelation.setTagValue(PUSHED_FILTERS_TAG, pushedFilters)
60+
val wrappedScan = scan match {
61+
case v1: V1Scan =>
62+
val translated = filters.flatMap(DataSourceStrategy.translateFilter)
63+
V1ScanWrapper(v1, translated, pushedFilters)
64+
case _ => scan
65+
}
66+
67+
val scanRelation = DataSourceV2ScanRelation(relation.table, wrappedScan, output)
6268

6369
val projectionOverSchema = ProjectionOverSchema(output.toStructType)
6470
val projectionFunc = (expr: Expression) => expr transformDown {
@@ -81,3 +87,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
8187
withProjection
8288
}
8389
}
90+
91+
// A wrapper for v1 scan to carry the translated filters and the handled ones. This is required by
92+
// the physical v1 scan node.
93+
case class V1ScanWrapper(
94+
v1Scan: V1Scan,
95+
translatedFilters: Seq[sources.Filter],
96+
handledFilters: Seq[sources.Filter]) extends Scan {
97+
override def readSchema(): StructType = v1Scan.readSchema()
98+
}

0 commit comments

Comments
 (0)