Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented May 18, 2019

What changes were proposed in this pull request?

Performance issue using explode was found when a complex field contains huge array is to get duplicated as the number of exploded array elements. Given example:

val df = spark.sparkContext.parallelize(Seq(("1",
  Array.fill(M)({
    val i = math.random
    (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
  })))).toDF("col", "arr")
  .selectExpr("col", "struct(col, arr) as st")
  .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col")

The explode causes st to be duplicated as many as the exploded elements.

Benchmarks it:

[info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
[info] Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
[info] generate big nested struct array:         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] generate big nested struct array wholestage off          52668          53162         699          0.0      877803.4       1.0X
[info] generate big nested struct array wholestage on          47261          49093        1125          0.0      787690.2       1.1X
[info]

The query plan:

== Physical Plan ==
 Project [col#508, st#512.col AS col1#515, arr_col#519]
 +- Generate explode(st#512.arr), [col#508, st#512], false, [arr_col#519]
    +- Project [_1#503 AS col#508, named_struct(col, _1#503, arr, _2#504) AS st#512]
       +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#503, mapobjects(MapObjects_loopValue84, MapObjects_loopIsNull84,      ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true)))     null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._1, true, false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._2, true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String,     StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._3, true,  false), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84,   MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#504]
          +- Scan[obj#534]

This patch takes nested column pruning approach to prune unnecessary nested fields. It adds a projection of the needed nested fields as aliases on the child of Generate, and substitutes them by alias attributes on the projection on top of Generate.

Benchmarks it after the change:

 [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
 [info] Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
 [info] generate big nested struct array:         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 [info] ------------------------------------------------------------------------------------------------------------------------
 [info] generate big nested struct array wholestage off            311            331          28          0.2        5188.6       1.0X
 [info] generate big nested struct array wholestage on            297            312          15          0.2        4947.3       1.0X
 [info]

The query plan:

== Physical Plan ==
 Project [col#592, _gen_alias_608#608 AS col1#599, arr_col#603]
 +- Generate explode(st#596.arr), [col#592, _gen_alias_608#608], false, [arr_col#603]
    +- Project [_1#587 AS col#592, named_struct(col, _1#587, arr, _2#588) AS st#596, _1#587 AS _gen_alias_608#608]
       +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(in
 put[0, scala.Tuple2, true]))._1, true, false) AS _1#587, mapobjects(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4),
 if (isnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))) null else named_struct(_1,        staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102,              MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._1, true, false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String,    StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._2,      true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,                                                 knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._3, true, false), _4,            staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102,              MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2,      None) AS _2#588]
          +- Scan[obj#586]

This behavior is controlled by a SQL config spark.sql.optimizer.expression.nestedPruning.enabled.

How was this patch tested?

Added benchmark.

@SparkQA
Copy link

SparkQA commented May 18, 2019

Test build #105516 has finished for PR 24637 at commit 6cab5ac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented May 19, 2019

@viirya viirya changed the title [SPARK-27707][SQL] Prune unnecessary nested fields from Generate [SPARK-27707][SQL] Prune unnecessary nested fields from Generate to address performance issue in explode May 19, 2019
@uzadude
Copy link
Contributor

uzadude commented May 19, 2019

@viirya - looks great! exactly what I had in mind but wasn't sure how to implement it.

@SparkQA
Copy link

SparkQA commented May 19, 2019

Test build #105522 has finished for PR 24637 at commit f036649.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented May 19, 2019

retest this please.

@SparkQA
Copy link

SparkQA commented May 19, 2019

Test build #105526 has finished for PR 24637 at commit f036649.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

cc @dbtsai

case Project(projectList, child)
if SQLConf.get.nestedSchemaPruningEnabled && canProjectPushThrough(child) =>
getAliasSubMap(projectList)
case Project(projectList, child) => getAliasSubMap(projectList)
Copy link
Member

@dongjoon-hyun dongjoon-hyun May 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya . Sorry, but this is a regression on all the existing code. We should avoid getAliasSubMap invocation. https://github.com/apache/spark/pull/24637/files#diff-a636a87d8843eeccca90140be91d4fafR635 doesn't prevent getAliasSubMap invocation inside unapply, does it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. If so, I need to make a little change to prevent it. Will change it later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@SparkQA
Copy link

SparkQA commented May 20, 2019

Test build #105564 has finished for PR 24637 at commit beb8993.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 20, 2019

Test build #105568 has finished for PR 24637 at commit caea246.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))

// prune unrequired nested fields
case p @ Project(projectList, g: Generate) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to special case Generate? I see there is a general case for nested column pruning below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is more general case you are looking for:

case p @ NestedColumnAliasing(nestedFieldToAlias, attrToAliases) =>
NestedColumnAliasing.replaceToAliases(p, nestedFieldToAlias, attrToAliases)

In the general case, it's doing pruning only when the flag (nestedSchemaPruningEnabled) is enabled. The case considers some operators that nested project can be pushed through. Generate isn't one of them. So the general case doesn't work on this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generate is special for another reason. We can't prune an output from its child even just a nested field of the output is used in the top project list. The generator could use it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add if SQLConf.get.nestedSchemaPruningEnabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nestedSchemaPruningEnabled is for pruning nested fields from logical relation. But this fix isn't due to the same cause. For the data sources can't be pruned nested fields, it is also useful to apply this fix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why don't we need another configuration then if spark.sql.optimizer.nestedSchemaPruning.enabled isn't part of this optimization, or fixing the doc of spark.sql.optimizer.nestedSchemaPruning.enabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a general fix no matter the setting of nestedSchemaPruning is. Do we need a config to disable this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My impression was that we need a configuration but I think you or @dongjoon-hyun have more context then me about nested pruning stuff. @cloud-fan, @dongjoon-hyun, @gatorsmile, can you make a call here if we need a config or not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nestedSchemaPruning is vague as it can point to nested pruning in scan, or general nested pruning in other operators. Currently the config affects scan nested pruning. I feel it is not tightly related to the fix here, because the fix isn't for scan.

If we are considering a config here, I think a different config or fix the doc of nestedSchemaPruning to explicitly indicate it also helps general nested pruning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me fix the doc of nestedSchemaPruning and apply this pruning when nestedSchemaPruning is enabled.

@SparkQA
Copy link

SparkQA commented May 22, 2019

Test build #105650 has finished for PR 24637 at commit 7a790ff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 14, 2019

Test build #106494 has finished for PR 24637 at commit ef97ffc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jun 16, 2019

Test build #106546 has finished for PR 24637 at commit ef97ffc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jun 19, 2019

Test build #106651 has finished for PR 24637 at commit ef97ffc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

range/limit/sum: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
range/limit/sum wholestage off 191 205 19 2738.4 0.4 1.0X
range/limit/sum wholestage on 112 124 13 4699.4 0.2 1.7X
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, this is irrelevant, but the ratio looks weird.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be improved by some reason. This is consistently better in @viirya and my tests.

[info] OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
[info] Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
[info] range/limit/sum:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] range/limit/sum wholestage off                      222            226           6       2359.5           0.4       1.0X
[info] range/limit/sum wholestage on                       114            121           8       4608.4           0.2       2.0X

@viirya
Copy link
Member Author

viirya commented Jul 12, 2019

Thanks @dongjoon-hyun for the advice! I will add more few test cases targeting other Generator.

@dongjoon-hyun
Copy link
Member

Thank you so much, @viirya !

@dongjoon-hyun
Copy link
Member

Thank you for adding a new test. Are you going to add more tests since Stack is one of them? In fact, we need more to be exhaustive.

One another approach is simply reducing the scope to the original goal. We can match only-Explode like the following in this PR.

case p @ Project(projectList, g: Generate) if ...
case p @ Project(projectList, g @ Generate(_: Explode, _, _, _, _, _)) if ...

Later, to cover more patterns, I think we need unapply and a white-list approach like the following.

  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
    case _: GlobalLimit => true
    ...
    case _ => false
  }

Since this PR is here for a long time, how about finishing here with Explode first, @viirya ?

@SparkQA
Copy link

SparkQA commented Jul 17, 2019

Test build #107788 has finished for PR 24637 at commit 9c225f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 18, 2019

@dongjoon-hyun I added more generators. I think existing generators should be found in the test.

@SparkQA
Copy link

SparkQA commented Jul 18, 2019

Test build #107827 has finished for PR 24637 at commit 5511445.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 18, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Jul 18, 2019

Test build #107834 has finished for PR 24637 at commit 5511445.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 18, 2019

Thank you for adding more. Then, please do a white list approach for those four expressions.

case p @ Project(projectList, g @ Generate(e: Explode, _, _, _, _, _)) if canPruneXXX(e) && 

cc @cloud-fan and @gatorsmile

@viirya
Copy link
Member Author

viirya commented Jul 19, 2019

I'm fine to add a white-list, thro I think this approach is not generator-specific. It is more conservative and safer, anyway.

@SparkQA
Copy link

SparkQA commented Jul 19, 2019

Test build #107870 has finished for PR 24637 at commit 0821444.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.
Thank you so much for working on this.
Merged to master.

@cloud-fan
Copy link
Contributor

We hit an exception caused by this rule. The plan becomes invalid after optimization

+- *(2) !Project [_gen_alias_68718#68718L AS cardinality#68575L, _gen_alias_68719#68719 AS durationSec#68576, _gen_alias_68720#68720 AS group#68578, _gen_alias_68721#68721 AS jobUuid#68579, _gen_alias_68722#68722 AS suite#68584, _gen_alias_68723#68723 AS testcase#68585, sha1(cast(_gen_alias_68721#68721 as binary)) AS jobSha#68600, sha1(cast(concat(_gen_alias_68722#68722, -, _gen_alias_68720#68720, -, cast(_gen_alias_68718#68718L as string), -, _gen_alias_68723#68723) as binary)) AS caseSha#68615]
      +- *(2) Generate explode(results#64594), false, [flattenRuns#68572]
         +- *(2) Project [results#64594]
            +- *(2) Sort [startTime#68717 DESC NULLS LAST], true, 0

We generate _gen_alias attributes in the parent Project but they are not available in the child Generate.

@viirya Can you help to take a look? thanks!

@viirya
Copy link
Member Author

viirya commented Jan 8, 2020

@cloud-fan Yea, will look at it tomorrow. Do you have test case? If no, I may try to reproduce it.

@cloud-fan
Copy link
Contributor

It's a very long query and I'm trying to minimize. Let's see if there is some clue about the query plan.

@dongjoon-hyun
Copy link
Member

Thank you for reporting, @cloud-fan !

@dongjoon-hyun
Copy link
Member

Could you file a JIRA for that, @cloud-fan ?

@viirya
Copy link
Member Author

viirya commented Jan 8, 2020

Is flattenRuns (generatorOutput) also not in the parent Project?

@viirya
Copy link
Member Author

viirya commented Jan 8, 2020

Re-checked the current rule. Still cannot find clue from it and and above query plan. At first glance, I suspect if there are nested column access at the top Project but not in Generate. I tested it and re-checked the rule, looks it is good. @cloud-fan May you have more clues?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants