-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-28939][SQL] Propagate SQLConf for plans executed by toRdd #25643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @cloud-fan |
Test build #109994 has finished for PR 25643 at commit
|
// If we are in the context of a tracked SQL operation, `SQLExecution.EXECUTION_ID_KEY` is set | ||
// and we have nothing to do here. Otherwise, we use the `SQLConf` captured at the creation of | ||
// this RDD. | ||
if (context.getLocalProperty("spark.sql.execution.id") == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use SQLExecution.EXECUTION_ID_KEY
instead of hardcode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot, because SQLExecution
is in core and here we are in catalyst, so we are missing the dependency..
} | ||
} | ||
|
||
override def clearDependencies() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this really needed? sqlRDD
is an constructor parameter which won't be kept as a member variable.
@@ -105,7 +105,7 @@ class QueryExecution( | |||
* Given QueryExecution is not a public class, end users are discouraged to use this: please | |||
* use `Dataset.rdd` instead where conversion will be applied. | |||
*/ | |||
lazy val toRdd: RDD[InternalRow] = executedPlan.execute() | |||
lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(executedPlan.execute(), SQLConf.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get the conf from the spark session?
} | ||
|
||
case class FakeQueryExecution(spark: SparkSession, physicalPlan: SparkPlan) | ||
extends QueryExecution(spark, LocalRelation()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 space indentation
seems like a good idea, also cc @hvanhovell |
* @param sqlRDD the `RDD` generated by the SQL plan | ||
* @param conf the `SQLConf` to apply to the execution of the SQL plan | ||
*/ | ||
class SQLExecutionRDD( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this in catalyst and not in core?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, I'm moving ti
Test build #110005 has finished for PR 25643 at commit
|
Test build #110015 has finished for PR 25643 at commit
|
// this RDD. | ||
if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) { | ||
synchronized { | ||
if (tasksRunning == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question; in case that an executor with a single core has multiple assigned tasks, is this property setup executed per task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I'd say so.
I assume this only applies to programmatically set confs? If you set them on app launch they still apply? when you do a bunch of dataframe/dataset operations and then do toRdd, it is definitely annoying we never indicate to the user catalyst stuff still happened (like SQL ui isn't present ) |
@tgravescs all SQL configs which have been set whatever way are applied.
yes, I quite agree on this. Unfortunately I don't have a good idea to solve this as the actions are performed outside the "SQL" world, ie. they are performed on the RDD by the user, so it's hard to track them within a SQL execution ID.... Honestly I have no good idea for that at the moment, might you have one, I think it would be great.. |
* @param conf the `SQLConf` to apply to the execution of the SQL plan | ||
*/ | ||
class SQLExecutionRDD( | ||
var sqlRDD: RDD[InternalRow], @transient conf: SQLConf) extends RDD[InternalRow](sqlRDD) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to pass in a SQLConf? I think we just always capture current SQLConf, so just private val sqlConfigs = SQLConf.get.getAllConfs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the current way, as the SQLConf
it is passed directly from the spark session as per @cloud-fan 's comment: #25643 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I'm fine with it.
// and we have nothing to do here. Otherwise, we use the `SQLConf` captured at the creation of | ||
// this RDD. | ||
if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) { | ||
SQLConf.withExistingConf(sqlConfExecutorSide) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, it got better.
Test build #110200 has finished for PR 25643 at commit
|
@@ -115,7 +115,8 @@ object SQLConf { | |||
* Returns the active config object within the current scope. If there is an active SparkSession, | |||
* the proper SQLConf associated with the thread's active session is used. If it's called from | |||
* tasks in the executor side, a SQLConf will be created from job local properties, which are set | |||
* and propagated from the driver side. | |||
* and propagated from the driver side, unless a has been set in the scope by `withExistingConf` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a has been
-> a SQLConf has been
Test build #110339 has finished for PR 25643 at commit
|
thanks, merging to master! |
thanks @cloud-fan and thank you all for the reviews. Just one question: what about backporting to 2.4? |
yea feel free to open a backport PR. |
@mgaido91 please add a feature flag for the backport. People might rely on the current 'buggy' behavior. |
@hvanhovell I feel weird to have such a flag only in Spark 2.4.* and not in 3.. I mean, if we introduce such flag we might want to have it in 3. too? |
@mgaido91 people currently using 2.4 in production might rely on the old behavior. So they should be able to move back to that if it breaks something for them. For 3.0 this is not an issue since it its not released yet and is under active development, in that case we are allowed to change behavior. |
Hi, All. |
This seems to be an old Scala bug on JDK9+.
|
I'll make a follow-up soon. |
#25738 is ready. |
### What changes were proposed in this pull request? The PR proposes to create a custom `RDD` which enables to propagate `SQLConf` also in cases not tracked by SQL execution, as it happens when a `Dataset` is converted to and RDD either using `.rdd` or `.queryExecution.toRdd` and then the returned RDD is used to invoke actions on it. In this way, SQL configs are effective also in these cases, while earlier they were ignored. ### Why are the changes needed? Without this patch, all the times `.rdd` or `.queryExecution.toRdd` are used, all the SQL configs set are ignored. An example of a reproducer can be: ``` withSQLConf(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false") { val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as field_$i"): _*) df.createOrReplaceTempView("spark64kb") val data = spark.sql("select * from spark64kb limit 10") // Subexpression elimination is used here, despite it should have been disabled data.describe() } ``` ### Does this PR introduce any user-facing change? When a user calls `.queryExecution.toRdd`, a `SQLExecutionRDD` is returned wrapping the `RDD` of the execute. When `.rdd` is used, an additional `SQLExecutionRDD` is present in the hierarchy. ### How was this patch tested? added UT Closes apache#25643 from mgaido91/SPARK-28939. Authored-by: Marco Gaido <marcogaido91@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
The PR proposes to create a custom
RDD
which enables to propagateSQLConf
also in cases not tracked by SQL execution, as it happens when aDataset
is converted to and RDD either using.rdd
or.queryExecution.toRdd
and then the returned RDD is used to invoke actions on it.In this way, SQL configs are effective also in these cases, while earlier they were ignored.
Why are the changes needed?
Without this patch, all the times
.rdd
or.queryExecution.toRdd
are used, all the SQL configs set are ignored. An example of a reproducer can be:Does this PR introduce any user-facing change?
When a user calls
.queryExecution.toRdd
, aSQLExecutionRDD
is returned wrapping theRDD
of the execute. When.rdd
is used, an additionalSQLExecutionRDD
is present in the hierarchy.How was this patch tested?
added UT