-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Fall back to Spark if query uses DPP with v1 data sources #897
Conversation
@@ -95,6 +95,10 @@ class CometSparkSessionExtensions | |||
plan | |||
} else { | |||
plan.transform { | |||
case scanExec: FileSourceScanExec if scanExec.partitionFilters.nonEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all partition filters are DPP filter. They are probably pushed down filters.
In Spark, DataSourceScanExec
has a check we can use:
private def isDynamicPruningFilter(e: Expression): Boolean =
e.exists(_.isInstanceOf[PlanExpression[_]])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, do you know how we can perform this check for v2 data sources (BatchScanExec)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BatchScanExec
has runtimeFilters
. I think it is similar?
case class BatchScanExec(
...
@transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
val dataSourceFilters = runtimeFilters.flatMap {
case DynamicPruningExpression(e) => DataSourceV2Strategy.translateRuntimeFilterV2(e)
case _ => None
}
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test in this PR does not trigger DPP when using v2 data source. I read that DPP is more optimized for v1 but not sure if that is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I took a look at DataSourceV2Strategy
. It have pushed down DPP filter (i.e., DynamicPruning
) into BatchScanExec
(i.e., runtimeFilters
).
case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
// projection and filters were already pushed down in the optimizer.
// this uses PhysicalOperation to get the projection and ensure that if the batch scan does
// not support columnar, a projection is added to convert the rows to UnsafeRow.
val (runtimeFilters, postScanFilters) = filters.partition {
case _: DynamicPruning => true
case _ => false
}
val batchExec = BatchScanExec(relation.output, relation.scan, runtimeFilters,
relation.ordering, relation.relation.table,
StoragePartitionJoinParams(relation.keyGroupedPartitioning))
@viirya I made this PR specific to v1 data source for now. Could you review again? |
Looks good to me. Thanks @andygrove |
Which issue does this PR close?
Partial fix for #895 (only addressed the issue for v1 sources)
Rationale for this change
Avoid performance regressions in TPC-DS when sales tables are partitioned by date
What changes are included in this PR?
How are these changes tested?