Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Feb 3, 2017

What changes were proposed in this pull request?

This issue is originally reported and discussed at http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-tc20803.html

When run a ML Pipeline with many stages, during the iterative updating to Dataset , it is observed the it takes longer time to finish the fit and transform as the query plan grows continuously.

The example code show as the following in benchmark.

Specially, the time spent on preparing optimized plan in current branch is much higher than 1.6. Actually, the time is spent mostly on generating query plan's constraints during few optimization rules.

getAliasedConstraints is found to be a function costing most of the running time. As the constraints for aliasing will increase very fast.

This patch tries to improve the performance of getAliasedConstraints.

Benchmark

Run the following codes locally.

import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}

val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))

val indexers = df.columns.tail.map(c => new StringIndexer()
  .setInputCol(c)
  .setOutputCol(s"${c}_indexed")
  .setHandleInvalid("skip"))

val encoders = indexers.map(indexer => new OneHotEncoder()
  .setInputCol(indexer.getOutputCol)
  .setOutputCol(s"${indexer.getOutputCol}_encoded")
  .setDropLast(true))

val stages: Array[PipelineStage] = indexers ++ encoders
val pipeline = new Pipeline().setStages(stages)

val startTime = System.nanoTime
pipeline.fit(df).transform(df).show
val runningTime = System.nanoTime - startTime

Before this patch: 1786001 ms
After this patch: 843688 ms

More than half of original running time is saved.

How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@SparkQA
Copy link

SparkQA commented Feb 3, 2017

Test build #72303 has started for PR 16785 at commit b4e514a.

@viirya
Copy link
Member Author

viirya commented Feb 3, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 3, 2017

Test build #72305 has finished for PR 16785 at commit b4e514a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya changed the title [SPARK-19443][SQL] The function to generate constraints takes too long when the query plan grows continuously [SPARK-19443][SQL][WIP] The function to generate constraints takes too long when the query plan grows continuously Feb 3, 2017
@viirya
Copy link
Member Author

viirya commented Feb 6, 2017

The rewritten logic is not correct. I am working to improve this with other approach.

@viirya
Copy link
Member Author

viirya commented Feb 9, 2017

I don't find a way to improve getAliasedConstraints significantly by re-writing its logic. The current way to improve its performance is using parallel collection to do the transformation in parallel. It can cut the running time by half (see benchmark in the pr description), but the running time (13.5 secs) is still too long compared with 1.6.

We may consider #16775 which is an another solution to fix this issue by checkpointing datasets for pipelines of long stages, or both of them.

@viirya
Copy link
Member Author

viirya commented Feb 9, 2017

since this change is related to SQL, cc @cloud-fan @hvanhovell

@SparkQA
Copy link

SparkQA commented Feb 9, 2017

Test build #72625 has finished for PR 16785 at commit 8c98a5c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya changed the title [SPARK-19443][SQL][WIP] The function to generate constraints takes too long when the query plan grows continuously [SPARK-19443][SQL] The function to generate constraints takes too long when the query plan grows continuously Feb 9, 2017
val parAllConstraints = child.constraints.asInstanceOf[Set[Expression]].filter { constraint =>
constraint.references.intersect(relativeReferences).nonEmpty
}.par
parAllConstraints.tasksupport = UnaryNode.taskSupport
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using a custom task support instead of the default (which uses the global fork-join executor)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do they have the same parallelism level? BTW, I saw the parallel collection used in other places in Spark all take custom task support.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether they do or not depends on the implementation of the default task support. But even if they use the same level of parallelism, they're distinct executors. Which means they won't share a common thread pool or task queue. I don't know why Spark would use custom task support in other places. It may be to avoid engaging all of the CPU cores on the host machine. But then it seems more efficient for Spark to have its own global task support.

@hvanhovell
Copy link
Contributor

@viirya this looks like a very big hammer to solve this problem. Can't we try a different approach?

I think we should try to avoid optimizing already optimized code snippets, you might be able to do this using some kind of a fence. It would even be better if we would have a recursive node.

@viirya
Copy link
Member Author

viirya commented Feb 10, 2017

@hvanhovell Yeah, I think so. As in previous comment, I don't find a way now to improve getAliasedConstraints significantly by re-writing its logic without parallel collection.

We may consider #16775 which is an another solution to fix this issue by checkpointing datasets for pipelines of long stages.

@cloud-fan
Copy link
Contributor

can we consider this in a higher-level of view instead of focusing on the method getAliasedConstraints? Maybe there is a way to do constraint propagation faster.

@viirya
Copy link
Member Author

viirya commented Feb 11, 2017

@cloud-fan yeah, i agreed with you and @hvanhovell.

For too slow constraint propagation, except for attacking getAliasedConstraints like this change, maybe we can have other way to improve the process doing constraint propagation.

If we can't, for such long lineages, I think we should use checkpointing to fix it like #16775.

@viirya
Copy link
Member Author

viirya commented Feb 17, 2017

this looks like a very big hammer to solve this problem. Can't we try a different approach?
I think we should try to avoid optimizing already optimized code snippets, you might be able to do this using some kind of a fence. It would even be better if we would have a recursive node.

@cloud-fan @hvanhovell Ok. I've figured out to add a filter to reduce the candidates of aliased constraints. It can achieve same speed-up (cut of half running time in benchmark) without parallel collection hammer.

Can you have time to look at it? Thanks.

@viirya viirya force-pushed the improve-constraints-generation branch from 4ba93fe to 278c31c Compare February 17, 2017 03:29
@SparkQA
Copy link

SparkQA commented Feb 17, 2017

Test build #73034 has finished for PR 16785 at commit 4ba93fe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 17, 2017

Test build #73035 has finished for PR 16785 at commit 278c31c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

cc @sameeragarwal

// For example, for a constraint 'a > b', if 'a' is aliased to 'c', we need to get aliased
// constraint 'c > b' only if 'b' is in output.
var allConstraints = child.constraints.filter { constraint =>
constraint.references.subsetOf(relativeReferences)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @viirya, I understand that #16998 probably supersedes this, but just out of curiosity, did you see a lot of benefit from pruning these attributes here given that we already prune them later in QueryPlan?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. You can see the benchmark in the pr description. With pruning these attributes, the running time is cut half.

If I understand your comment correctly, pruning them later in QueryPlan means we prune constraints which don't refer attributes in outputSet.

But the pruning here is happened before the pruning you pointed out, we need to reduce the constraints taken for transforming aliasing attributes to lower the computation cost.

asfgit pushed a commit that referenced this pull request Mar 24, 2017
## What changes were proposed in this pull request?

Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins.

Compared with previous PRs #16998, #16785, this is a much simpler option: add a flag to disable constraint propagation.

### Benchmark

Run the following codes locally.

    import org.apache.spark.ml.{Pipeline, PipelineStage}
    import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
    import org.apache.spark.sql.internal.SQLConf

    spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)

    val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))

    val indexers = df.columns.tail.map(c => new StringIndexer()
      .setInputCol(c)
      .setOutputCol(s"${c}_indexed")
      .setHandleInvalid("skip"))

    val encoders = indexers.map(indexer => new OneHotEncoder()
      .setInputCol(indexer.getOutputCol)
      .setOutputCol(s"${indexer.getOutputCol}_encoded")
      .setDropLast(true))

    val stages: Array[PipelineStage] = indexers ++ encoders
    val pipeline = new Pipeline().setStages(stages)

    val startTime = System.nanoTime
    pipeline.fit(df).transform(df).show
    val runningTime = System.nanoTime - startTime

Before this patch: 1786001 ms ~= 30 mins
After this patch: 26392 ms = less than half of a minute

Related PRs: #16998, #16785.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #17186 from viirya/add-flag-disable-constraint-propagation.
@viirya viirya closed this Mar 31, 2017
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins.

Compared with previous PRs apache#16998, apache#16785, this is a much simpler option: add a flag to disable constraint propagation.

Run the following codes locally.

    import org.apache.spark.ml.{Pipeline, PipelineStage}
    import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
    import org.apache.spark.sql.internal.SQLConf

    spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)

    val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))

    val indexers = df.columns.tail.map(c => new StringIndexer()
      .setInputCol(c)
      .setOutputCol(s"${c}_indexed")
      .setHandleInvalid("skip"))

    val encoders = indexers.map(indexer => new OneHotEncoder()
      .setInputCol(indexer.getOutputCol)
      .setOutputCol(s"${indexer.getOutputCol}_encoded")
      .setDropLast(true))

    val stages: Array[PipelineStage] = indexers ++ encoders
    val pipeline = new Pipeline().setStages(stages)

    val startTime = System.nanoTime
    pipeline.fit(df).transform(df).show
    val runningTime = System.nanoTime - startTime

Before this patch: 1786001 ms ~= 30 mins
After this patch: 26392 ms = less than half of a minute

Related PRs: apache#16998, apache#16785.

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#17186 from viirya/add-flag-disable-constraint-propagation.
vatsalmevada pushed a commit to TIBCOSoftware/snappy-spark that referenced this pull request Oct 23, 2019
…ional (#179)

[SNAP-3195] Exposing `spark.sql.constraintPropagation.enabled` config
to disable optimization rules related to constraint propagation.

Cherry-picked from e011004 and resolved
merge conflicts.

--- 
# Original commit message:

[SPARK-19846][SQL] Add a flag to disable constraint propagation

## What changes were proposed in this pull request?

Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins.

Compared with previous PRs apache#16998, apache#16785, this is a much simpler option: add a flag to disable constraint propagation.

### Benchmark

Run the following codes locally.

    import org.apache.spark.ml.{Pipeline, PipelineStage}
    import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
    import org.apache.spark.sql.internal.SQLConf

    spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)

    val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))

    val indexers = df.columns.tail.map(c => new StringIndexer()
      .setInputCol(c)
      .setOutputCol(s"${c}_indexed")
      .setHandleInvalid("skip"))

    val encoders = indexers.map(indexer => new OneHotEncoder()
      .setInputCol(indexer.getOutputCol)
      .setOutputCol(s"${indexer.getOutputCol}_encoded")
      .setDropLast(true))

    val stages: Array[PipelineStage] = indexers ++ encoders
    val pipeline = new Pipeline().setStages(stages)

    val startTime = System.nanoTime
    pipeline.fit(df).transform(df).show
    val runningTime = System.nanoTime - startTime

Before this patch: 1786001 ms ~= 30 mins
After this patch: 26392 ms = less than half of a minute

Related PRs: apache#16998, apache#16785.
@viirya viirya deleted the improve-constraints-generation branch December 27, 2023 18:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants