Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Mar 7, 2017

What changes were proposed in this pull request?

Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins.

Compared with previous PRs #16998, #16785, this is a much simpler option: add a flag to disable constraint propagation.

Benchmark

Run the following codes locally.

import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.sql.internal.SQLConf

spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)

val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))

val indexers = df.columns.tail.map(c => new StringIndexer()
  .setInputCol(c)
  .setOutputCol(s"${c}_indexed")
  .setHandleInvalid("skip"))

val encoders = indexers.map(indexer => new OneHotEncoder()
  .setInputCol(indexer.getOutputCol)
  .setOutputCol(s"${indexer.getOutputCol}_encoded")
  .setDropLast(true))

val stages: Array[PipelineStage] = indexers ++ encoders
val pipeline = new Pipeline().setStages(stages)

val startTime = System.nanoTime
pipeline.fit(df).transform(df).show
val runningTime = System.nanoTime - startTime

Before this patch: 1786001 ms ~= 30 mins
After this patch: 26392 ms = less than half of a minute

Related PRs: #16998, #16785.

How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@viirya
Copy link
Member Author

viirya commented Mar 7, 2017

cc @sameeragarwal @hvanhovell

This is a much simpler option: add a flag to disable constraint propagation, if we are ok for skipping optimizations relying on constraints (InferFiltersFromConstraints, part of PruneFilters, EliminateOuterJoin) when disabling this flag for the uncommon cases.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74071 has finished for PR 17186 at commit 44e494b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class InferFiltersFromConstraints(conf: CatalystConf)
  • case class PruneFilters(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper
  • case class EliminateOuterJoin(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper

object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
case class InferFiltersFromConstraints(conf: CatalystConf)
extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just check the flag before you start transforming the tree. That is a lot simpler & faster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1; this rule is just a no-op if constraints aren't inferred.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _))
if conf.constraintPropagationEnabled =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is far to restrictive. We can still eliminate outer joins without constraint propagation.

@hvanhovell
Copy link
Contributor

@viirya could you add a test?

@viirya
Copy link
Member Author

viirya commented Mar 7, 2017

@hvanhovell It is late in local time, I addressed the comments first. I will add test tomorrow.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74108 has finished for PR 17186 at commit ae9f037.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@sameeragarwal sameeragarwal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of disabling every optimization, can we just not configure lazy val constraints: ExpressionSet = ExpressionSet(Set.empty) in QueryPlan to achieve the same goal?

@viirya
Copy link
Member Author

viirya commented Mar 7, 2017

@sameeragarwal To do that, we may need to change constraints to a method taking CatalystConf. As constraints is public, is it good to do?

@viirya
Copy link
Member Author

viirya commented Mar 8, 2017

@sameeragarwal Btw, another point is, if we do that, we still need to transform the plan even the flag is disabled.

@viirya
Copy link
Member Author

viirya commented Mar 8, 2017

@sameeragarwal Rethink about it, I think let QueryPlan returns constraints depending on the flag is more easy to test. I will give it a try.

@viirya
Copy link
Member Author

viirya commented Mar 8, 2017

@hvanhovell @sameeragarwal Following @sameeragarwal's comment, now instead of disabling every optimization, a new method getConstraints in QueryPlan will return empty constraints if the flag is disabled, otherwise returning original propagated constraints.

Please take a look. Thanks.

@viirya
Copy link
Member Author

viirya commented Mar 8, 2017

Btw, test cases are added.

@viirya viirya force-pushed the add-flag-disable-constraint-propagation branch from c863f67 to 3eda726 Compare March 8, 2017 04:03
@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74171 has finished for PR 17186 at commit 8318152.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74173 has finished for PR 17186 at commit c863f67.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74174 has finished for PR 17186 at commit 3eda726.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 9, 2017

Test build #74248 has started for PR 17186 at commit eb200d6.

@viirya
Copy link
Member Author

viirya commented Mar 9, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Mar 9, 2017

Test build #74253 has finished for PR 17186 at commit eb200d6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public static class LongWrapper
  • public static class IntWrapper
  • case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper
  • case class JoinPlan(itemIds: Set[Int], plan: LogicalPlan, joinConds: Set[Expression], cost: Cost)
  • case class Cost(rows: BigInt, size: BigInt)
  • abstract class RepartitionOperation extends UnaryNode
  • case class FlatMapGroupsWithState(
  • class CSVOptions(
  • class UnivocityParser(
  • trait WatermarkSupport extends UnaryExecNode
  • case class FlatMapGroupsWithStateExec(

@viirya
Copy link
Member Author

viirya commented Mar 10, 2017

@hvanhovell @sameeragarwal Please let me know if you have more thoughts on the new change. Thanks.

@sameeragarwal
Copy link
Member

Thanks @viirya, this approach makes sense to me. Can you please modify InferFiltersFromConstraints and I'll take a closer look.

@sameeragarwal
Copy link
Member

Here's another instantiation of the underlying bug: https://issues.apache.org/jira/browse/SPARK-19875

.createWithDefault(false)

val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
.internal()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be an internal flag, right? cc @sameeragarwal @hvanhovell

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about it, because constraint propagation is internal details.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To determine whether a flag is internal or not, we should consider the impact of external users. If users could easily hit this, we might need to expose it as an external flag and document it in the public document.

Copy link
Member Author

@viirya viirya Mar 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the fact there are few users reporting hitting this issue, we may need to expose it as an external flag. But looks like it is not very common issue, compared with other external flag.

However, I would think that a large portion of external users may not know constraint propagation. It might not be intuitive to link the problem they hit to constraint propagation and to find this config, even it is external.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constraint propagation is like predicate pushdown, which has already been used in external configurations. We can rename it to make external users easy to understand, e.g., constraints inferences.

@SparkQA
Copy link

SparkQA commented Mar 14, 2017

Test build #74473 has finished for PR 17186 at commit 0e204bc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Mar 14, 2017

@sameeragarwal Thanks for the comment. I've updated InferFiltersFromConstraints.

@SparkQA
Copy link

SparkQA commented Mar 15, 2017

Test build #74581 has started for PR 17186 at commit d3b0a72.

@SparkQA
Copy link

SparkQA commented Mar 15, 2017

Test build #74588 has finished for PR 17186 at commit d4c9a5e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class JsonDataSource extends Serializable

val optimized = OptimizeDisableConstraintPropagation.execute(queryWithUselessFilter.analyze)
// When constraint propagation is disabled, the useless filter won't be pruned.
// It gets pushed down. Because the rule `CombineFilters` runs only once, there are redundant
// and duplicate filters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behaviour does not make sense to me. If I write a query like

select * from (select * from t1 where t1.a1 > 1) tx where tx.a1 > 1

I expect that Spark evaluates the predicate only once. The wording of "constraint propagation" is misleading. In this example, there is no activity of propagation at all. Perhaps we want to distinguish the "constraints" between the ones written originally and the ones that are inferred from relationships with other predicates. When the "propagation" (or perhaps a more meaningful term "predicate inference") is set to OFF, we want to exclude those inferred predicates in the def constraints.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What needs to clarify is, this behaviour is just limited to this test case. That is why I added the comment. In normal optimization, CombineFilters will run multiple times and the predicates will be combined.

Copy link
Contributor

@nsyca nsyca Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware of it. My point is when users turn on this setting in hope of alleviating the long compilation time, they will get this "unintentional" side effect that could lengthen the execution time of evaluating the same predicate twice.

Overall, I agree with your approach but the point I raised could be a followup work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a workaround in short term. Actually I have proposed another approach to bring new data structure for constraint propagation in #16998. But it is more complex and may need more time to consider and review.

@viirya
Copy link
Member Author

viirya commented Mar 20, 2017

ping @sameeragarwal This is updated according to your previous comment. Can you help review this? Thanks.

@viirya
Copy link
Member Author

viirya commented Mar 22, 2017

ping @sameeragarwal Is it possible this goes in before code-freeze of 2.2? Please let me know that, thanks.

@SparkQA
Copy link

SparkQA commented Mar 22, 2017

Test build #75043 has finished for PR 17186 at commit da09d9f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sameeragarwal
Copy link
Member

Sorry @viirya, I'll review this first thing tomorrow morning

@viirya
Copy link
Member Author

viirya commented Mar 23, 2017

@sameeragarwal Thanks a lot.

Copy link
Member

@sameeragarwal sameeragarwal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great overall, I just left a few minor comments. Thanks!


private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
val constraintEnabled = conf.constraintPropagationEnabled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. missing it.


val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
.internal()
.doc("When true, the query optimizer will use constraint propagation in query plans to " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 'get around the issue' might sound pretty vague to a non-expert user. How about something along these lines?

    .doc("When true, the query optimizer will infer and propagate data constraints in the query " +
      "plan to optimize them. Constraint propagation can sometimes be computationally expensive" +
      "for certain kinds of query plans (such as those with a large number of predicates and " +
      "aliases) which might negatively impact overall runtime.")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

CombineFilters) :: Nil
}

object OptimizeDisableConstraintPropagation extends RuleExecutor[LogicalPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps OptimizeWithConstraintPropagationDisabled?

PushPredicateThroughJoin) :: Nil
}

object OptimizeDisableConstraintPropagation extends RuleExecutor[LogicalPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same as above

EliminateSubqueryAliases) ::
Batch("Outer Join Elimination", Once,
EliminateOuterJoin,
EliminateOuterJoin(SimpleCatalystConf(caseSensitiveAnalysis = true)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for outer join elimination as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test.

@viirya
Copy link
Member Author

viirya commented Mar 24, 2017

@sameeragarwal Thanks for review! I've addressed all the comments now. Please take a look if it is good for you.

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75139 has finished for PR 17186 at commit a02c8cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sameeragarwal
Copy link
Member

LGTM, thanks! cc @hvanhovell

@rxin
Copy link
Contributor

rxin commented Mar 24, 2017

Merging in master.

@asfgit asfgit closed this in e011004 Mar 24, 2017
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins.

Compared with previous PRs apache#16998, apache#16785, this is a much simpler option: add a flag to disable constraint propagation.

Run the following codes locally.

    import org.apache.spark.ml.{Pipeline, PipelineStage}
    import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
    import org.apache.spark.sql.internal.SQLConf

    spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)

    val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))

    val indexers = df.columns.tail.map(c => new StringIndexer()
      .setInputCol(c)
      .setOutputCol(s"${c}_indexed")
      .setHandleInvalid("skip"))

    val encoders = indexers.map(indexer => new OneHotEncoder()
      .setInputCol(indexer.getOutputCol)
      .setOutputCol(s"${indexer.getOutputCol}_encoded")
      .setDropLast(true))

    val stages: Array[PipelineStage] = indexers ++ encoders
    val pipeline = new Pipeline().setStages(stages)

    val startTime = System.nanoTime
    pipeline.fit(df).transform(df).show
    val runningTime = System.nanoTime - startTime

Before this patch: 1786001 ms ~= 30 mins
After this patch: 26392 ms = less than half of a minute

Related PRs: apache#16998, apache#16785.

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#17186 from viirya/add-flag-disable-constraint-propagation.
@xyxiaoyou
Copy link

May I ask how to solve this issue in spark2.3?
I see that this flag has been removed in PruneFilters and EliminateOuterJoin. @viirya @gatorsmile

@gatorsmile
Copy link
Member

@xyxiaoyou SQLConf.get ?

@xyxiaoyou
Copy link

@xyxiaoyou SQLConf.get ?
What I want to express is that I wonder if this issue has been resolved in spark2.3/2.4,
Is there a more appropriate solution?
Because I found that more complex SQL would cause the excutor stuck.

@xyxiaoyou
Copy link

Hi, @gatorsmile , when using 'create or replace view test_view as...', spark will first generate a 'select...'query job. This causes the create view to be particularly slow. Is there a switch control so that spark does not do queries or create faster when creating a view?
image

@xyxiaoyou
Copy link

image
image

@xyxiaoyou
Copy link

@gatorsmile This problem has been bothering our team for a long time, I hope you can give us some suggestions or help us solve it.Thanks a lot.

@viirya viirya deleted the add-flag-disable-constraint-propagation branch December 27, 2023 18:34
@ahshahid
Copy link

@xyxiaoyou : for your reference: take a look at https://issues.apache.org/jira/browse/SPARK-33152
and corresponding PR ( though it has been closed, but solution is there)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants