-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-52187][SQL] Introduce Join pushdown for DSv2 #50921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-52187][SQL] Introduce Join pushdown for DSv2 #50921
Conversation
c90f33e
to
ea86140
Compare
ea86140
to
ecb5608
Compare
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
public String[] qualifier; | ||
public String name; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to separate qualifier and name? I think JoinColumn
should be the same as NamedReference
with an additional isInLeftSideOfJoin
flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have similar implementation as FieldReference
where we would have parts
and isInLeftSideOfJoin
fields, but I find the separation between qualifier and name nicer because it makes code cleaner in some way.
If you take a look at JDBCScanBuilder.pushJoin
we are passing the condition that contains JoinColumns as leaf expressions, but these are not yet qualified. I am qualifying these later on, in qualifyCondition
method.
Without qualifier-name
separation, and with parts:Seq[String]
I would need to do array shifting, which is fine but I just find it nicer my way.
I can however change the implementation of JoinColumn if to be something like:
private[sql] final case class JoinColumn(
parts: Seq[String],
isInLeftSideOfJoin: Boolean)
extends NamedReference {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
override def fieldNames(): Array[String] = parts.toArray
}
Honestly, I am fine with both
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownJoin.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/JoinTypeSQLBuilder.java
Outdated
Show resolved
Hide resolved
import java.util.Map; | ||
|
||
/** | ||
* The builder to generate SQL for specific Join type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this is really needed. The join type string is quite simple, and Spark doesn't need to provide a helper to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be redundant. The reason why I have it is simply the answer to the following question: what if some dialect calls the specific join type differently. For example, what if there is a dialect that doesn't support CROSS JOIN
but only JOIN
syntax.
We can get same effect with just string comparison in the dialects, so we can get rid of it if you find it as an overkill.
@@ -174,6 +178,12 @@ protected String visitNamedReference(NamedReference namedRef) { | |||
return namedRef.toString(); | |||
} | |||
|
|||
protected String visitJoinColumn(JoinColumn column) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we fail by default? the implementations must provide the left/right side alias as a context, in order to generate the column name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the implementations must provide the left/right side alias as a context, in order to generate the column name.
not really.. The way I designed this is that you are already going to have left/right side alias in JoinColumn before visiting it. So I think this implementation is valid.
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
// SALARY#0, NAME#1, DEPT#1. This is done by adding projection with appropriate aliases. | ||
val projectList = realOutput.zip(holder.output).map { case (a1, a2) => | ||
val originalName = holder.exprIdToOriginalName(a2.exprId) | ||
Alias(a1, originalName)(a2.exprId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is originalName
always a2.name
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. a2 is coming from holder.output
that will have aliased names in format subquery_x_col_y
.
Original names are saved into sHolder at the time of it's creation in createScanBuilder
.
Does that answer your question?
@@ -573,6 +701,13 @@ case class ScanBuilderHolder( | |||
var pushedAggregate: Option[Aggregation] = None | |||
|
|||
var pushedAggOutputMap: AttributeMap[Expression] = AttributeMap.empty[Expression] | |||
|
|||
var joinedRelations: Seq[DataSourceV2RelationBase] = Seq() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does joinedRelations.isEmpty
indicate isJoinPushed
as false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can reuse joinedRelations.isEmpty
instead of isJoinPushed
. I will do that change.
@cloud-fan Shall we support join pushdown for DSV2 ? |
98350e2
to
ba482f1
Compare
ba482f1
to
2a02d6f
Compare
import org.apache.spark.annotation.Evolving; | ||
|
||
/** | ||
* Base class of the public Join type API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct this comment.
sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/JoinColumn.java
Outdated
Show resolved
Hide resolved
@@ -0,0 +1,23 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to add other types of joins as well.
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
|
||
val newSchema = leftHolder.builder.build().readSchema() | ||
val newOutput = (leftProjections ++ rightProjections).asInstanceOf[Seq[AttributeReference]] | ||
.zip(newSchema.fields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should fail if the number of columns doesn't match between Spark and the third-party data source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also check the data type.
)) { | ||
leftHolder.joinedRelations = leftHolder.joinedRelations :+ rightHolder.relation | ||
|
||
val newSchema = leftHolder.builder.build().readSchema() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not build the scan too early here, or call it an extra time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have introduced new API for the Join interface that will return the new schema after the join is pushed down.
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
Outdated
Show resolved
Hide resolved
val conditionString = condition.toScala match { | ||
case Some(cond) => | ||
qualifyCondition(cond, leftSideQualifier, rightSideQualifier) | ||
s"ON ${dialect.compileExpression(cond).get}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's safer to pass the generated subquery aliases to the compileExpression
function (or add a new compileJoinCondition
function), which should respect the aliases when generating SQL for JoinColumn. It's better than making JoinColumn
mutable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding new method is a bit tricky because we would need to fallback to compileExpression
in case the expression is not JoinColumn
. We can't easily call the overriden compileExpression
methods.
I went with expanding JDBCSQLBuilder with left and right qualifier opt. It seems like an overkill to just support JoinColumn
but I think this is the safest way.
sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/JoinTypeSQLBuilder.java
Outdated
Show resolved
Hide resolved
String[] fullyQualified = new String[qualifier.length + 1]; | ||
System.arraycopy(qualifier, 0, fullyQualified, 0, qualifier.length); | ||
fullyQualified[qualifier.length] = name; | ||
return fullyQualified; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems JoinColumn
is a unmodified class, should we cache the fullyQualified
even if copy array here in case of called many times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed the way JoinColumn
works. It doesn't have the qualifier anymore, it has only the name. Third party connector should handle the qualifiers, similarly to how it's done in JDBCSQLBuilder
805d7ad
to
ef1ca7c
Compare
ef1ca7c
to
227289e
Compare
this.leftSideOfJoin = leftSideOfJoin; | ||
} | ||
|
||
private String name; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we also use String[]
in case we want to support join condition with nested columns in the future?
What changes were proposed in this pull request?
With this PR I am introducing the Join pushdown interface for DSv2 connectors and it's implementation for JDBC connectors.
The interface itself,
SupportsPushDownJoin
has the following API:If
isRightSideCompatibleForJoin
is true, then the join will be tried to be pushed down (it can still fail though).getOutputSchema
returns the new schema of the ScanBuilder after the join has been pushed down.With this implementation, only Inner joins are supported. Left and Right joins should be added as well. Cross joins won't be supported since they can increase the amount of data that is being read.
Also, none of the dialects currently supports the join push down. It is only available for H2 dialect. The join push down capability is guarded by SQLConf
spark.sql.optimizer.datasourceV2JoinPushdown
, JDBC optionpushDownJoin
and JDBC dialect methodsupportsJoin
.For the following JDBC query:
the generated SQL query on spark side would be:
Why are the changes needed?
DSv2 connectors can't push down the join operator.
Does this PR introduce any user-facing change?
This PR itself no since the behaviour is not implemented for any of the connectors (besides H2 which is testing JDBC dialect).
How was this patch tested?
New tests and some local testing with TPCDS queries.
Was this patch authored or co-authored using generative AI tooling?