-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19665][SQL] Improve constraint propagation #16998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
24fb723 to
917de74
Compare
|
Test build #73159 has finished for PR 16998 at commit
|
917de74 to
d691c66
Compare
|
@hvanhovell Yes. #16785 only does a limited improvement. Both #16785 and this are non-parallel approach. |
|
Test build #73158 has finished for PR 16998 at commit
|
|
Test build #73163 has finished for PR 16998 at commit
|
d691c66 to
6cb896f
Compare
|
Test build #73174 has finished for PR 16998 at commit
|
I do not get your points. What does this mean? Constraint propagation is a bottom up mechanism for inferring the constraints. Can you elaborate your idea in the more formal way. I did not read the code. Just wondering if we could miss the chance of plan optimization after this PR? What is the negative impact, if exists? |
We fully expand the constraints with aliased attributes now. For example, if there is a constraint "a > b", and current query plan aliases "a" to "c" and "d". The final constraints of this plan is "a > b", "c > b", "d > b". The values of those constraints are all the same, either all true or all false. So in case of inferring filters from the constraints, we only need "a > b", other aliased constraints "c > b", "d > b" are not necessary.
The only one optimization I think would be affected is However, this is not a big impact and it can be easily solved. We can use a simple method to inquire if a given condition like "c > b" is contained in the fully expanded constraints of a query plan, without really fully expanding the constraints. |
|
@viirya please correct me if I'm wrong but scanning through this patch, it appears that the underlying problem is that duplicating and tracking aliased constraints using a |
|
By the way, as an aside we should probably allow constraint inference/propagation to be turned off via a conf flag to provide a quick work around against these kind of problems. |
|
@sameeragarwal That's correct.
As we use constraints in optimization, if we turn off constraint inference/propagation, wouldn't it miss optimization chance for query plans? |
6cb896f to
5be21b3
Compare
|
Test build #73316 has finished for PR 16998 at commit
|
|
@hvanhovell Do you have time to review this? |
|
I just ran into the same issue with Here's a minimal test case: val max = 12 // try increasing this
val df = Seq.empty[Int].toDF
val filter = (for (i <- 0 to max)
yield col("value") <=> i) reduce (_ || _)
val projections = for (i <- 0 to max)
yield (col("value") <=> i).as(s"value_$i")
val dummy = lit(true) // this can be anything
val result = df.filter(dummy).select(projections: _*).filter(filter).filter(dummy)
result.explainThe |
|
@hvanhovell Do you have any thoughts on this already? Please let me know. Thanks! |
Not really. Constraint propagation will still be enabled by default in Spark. The flag would just be a hammer to quickly get around issues like this and SPARK-17733. |
|
@viirya I'll take a closer look at this patch but given that this PR is primarily introducing a data structure that keeps track of aliased constraints, is there a fundamental reason for changing the underlying behavior (and restricting the optimization space)? Can there be a simpler alternative where we can still keep the old semantics? |
Yeah, of course. I meant that when you disable the flag, you wouldn't enjoy the optimization relying on constraint propagation. I will create another PR for this option.
I don't find an alternative fixing to keep the old semantics and not change the propagation structure, and also can largely improve performance at the same time. #16785 keeps the old semantics and not change the propagation structure, but it just can cut half of the running time regarding the benchmark. Adding the flag is one simpler option. |
## What changes were proposed in this pull request? Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins. Compared with previous PRs #16998, #16785, this is a much simpler option: add a flag to disable constraint propagation. ### Benchmark Run the following codes locally. import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler} import org.apache.spark.sql.internal.SQLConf spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false) val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0")) val indexers = df.columns.tail.map(c => new StringIndexer() .setInputCol(c) .setOutputCol(s"${c}_indexed") .setHandleInvalid("skip")) val encoders = indexers.map(indexer => new OneHotEncoder() .setInputCol(indexer.getOutputCol) .setOutputCol(s"${indexer.getOutputCol}_encoded") .setDropLast(true)) val stages: Array[PipelineStage] = indexers ++ encoders val pipeline = new Pipeline().setStages(stages) val startTime = System.nanoTime pipeline.fit(df).transform(df).show val runningTime = System.nanoTime - startTime Before this patch: 1786001 ms ~= 30 mins After this patch: 26392 ms = less than half of a minute Related PRs: #16998, #16785. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17186 from viirya/add-flag-disable-constraint-propagation.
|
Once we've added the flag, this issue is not urgent for now. I close first. |
Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins. Compared with previous PRs apache#16998, apache#16785, this is a much simpler option: add a flag to disable constraint propagation. Run the following codes locally. import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler} import org.apache.spark.sql.internal.SQLConf spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false) val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0")) val indexers = df.columns.tail.map(c => new StringIndexer() .setInputCol(c) .setOutputCol(s"${c}_indexed") .setHandleInvalid("skip")) val encoders = indexers.map(indexer => new OneHotEncoder() .setInputCol(indexer.getOutputCol) .setOutputCol(s"${indexer.getOutputCol}_encoded") .setDropLast(true)) val stages: Array[PipelineStage] = indexers ++ encoders val pipeline = new Pipeline().setStages(stages) val startTime = System.nanoTime pipeline.fit(df).transform(df).show val runningTime = System.nanoTime - startTime Before this patch: 1786001 ms ~= 30 mins After this patch: 26392 ms = less than half of a minute Related PRs: apache#16998, apache#16785. Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#17186 from viirya/add-flag-disable-constraint-propagation.
…ional (#179) [SNAP-3195] Exposing `spark.sql.constraintPropagation.enabled` config to disable optimization rules related to constraint propagation. Cherry-picked from e011004 and resolved merge conflicts. --- # Original commit message: [SPARK-19846][SQL] Add a flag to disable constraint propagation ## What changes were proposed in this pull request? Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins. Compared with previous PRs apache#16998, apache#16785, this is a much simpler option: add a flag to disable constraint propagation. ### Benchmark Run the following codes locally. import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler} import org.apache.spark.sql.internal.SQLConf spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false) val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0")) val indexers = df.columns.tail.map(c => new StringIndexer() .setInputCol(c) .setOutputCol(s"${c}_indexed") .setHandleInvalid("skip")) val encoders = indexers.map(indexer => new OneHotEncoder() .setInputCol(indexer.getOutputCol) .setOutputCol(s"${indexer.getOutputCol}_encoded") .setDropLast(true)) val stages: Array[PipelineStage] = indexers ++ encoders val pipeline = new Pipeline().setStages(stages) val startTime = System.nanoTime pipeline.fit(df).transform(df).show val runningTime = System.nanoTime - startTime Before this patch: 1786001 ms ~= 30 mins After this patch: 26392 ms = less than half of a minute Related PRs: apache#16998, apache#16785.
What changes were proposed in this pull request?
If there are aliased expression in the projection, we propagate constraints by completely expanding the original constraints with aliases.
This expanding costs much computation time when the number of aliases increases.
Fully expanding all constraints at all the time makes iterative ML algorithms where a ML pipeline with many stages runs very slow. See #16785.
Another issue is we actually don't need the additional constraints at most of time. For example, if there is a constraint "a > b", and "a" is aliased to "c" and "d". When we use this constraint in filtering, we don't need all constraints "a > b", "c > b", "d > b". We only need "a > b" because if it is false, it is guaranteed that all other constraints are false too.
Benchmark
Run the following codes locally.
Before this patch: 1786001 ms ~= 30 mins
After this patch: 49972 ms = less than 1 min
How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.