Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jun 19, 2021

What changes were proposed in this pull request?

This patch refactors the evaluation of subexpressions.

There are two changes:

  1. Clean up subexpression code after evaluation to avoid duplicate evaluation.
  2. Evaluate all children subexpressions when evaluating a subexpression.

Why are the changes needed?

Currently subexpressionEliminationForWholeStageCodegen return the gen-ed code of subexpressions. The caller simply puts the code into its code block. We need more flexible evaluation here. For example, for Filter operator's subexpression evaluation, we may need to evaluate particular subexpression for one predicate. Current approach cannot satisfy the requirement.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing tests.

@github-actions github-actions bot added the SQL label Jun 19, 2021
@SparkQA
Copy link

SparkQA commented Jun 19, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44555/

@SparkQA
Copy link

SparkQA commented Jun 19, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44555/

@SparkQA
Copy link

SparkQA commented Jun 19, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44557/

@SparkQA
Copy link

SparkQA commented Jun 19, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44557/

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Jun 20, 2021

Test build #140030 has finished for PR 32980 at commit e8a03f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class SubExprEliminationState(

@HyukjinKwon
Copy link
Member

cc @rednaxelafx too FYI

@viirya
Copy link
Member Author

viirya commented Jun 20, 2021

cc @maropu @cloud-fan

*
* @param codes Strings representing the codes that evaluate common subexpressions.
* @param codes all `SubExprEliminationState` representing the codes that evaluate common
* subexpressions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit hard to understand. what's the difference between SubExprEliminationState here and in states?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The're the same SubExprEliminationState. states is used as map when we look for subexpressions to replace in an expression. codes are all values in the map, and they are in the sequence when we create them.

Now I'm thinking it more, maybe we don't need to keep the sequence (codes). As this PR cleans up child subexpressions during evaluation. The order of evaluation seems not important anymore.

@SparkQA
Copy link

SparkQA commented Jun 21, 2021

Test build #140069 has finished for PR 32980 at commit 777b9a4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44597/

childrenSubExprs += subExprEliminationExprs(e)
case _ =>
}
val state = SubExprEliminationState(eval.code, eval.isNull, eval.value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems it's simpler if we define SubExprEliminationState as SubExprEliminationState(eval: ExprValue, children: ...)

Copy link
Member Author

@viirya viirya Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean SubExprEliminationState(eval: ExprCode, children: ...)?

@viirya
Copy link
Member Author

viirya commented Jun 21, 2021

Hmm, directly use the values in the map causes some test failure.

@SparkQA
Copy link

SparkQA commented Jun 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44627/

@SparkQA
Copy link

SparkQA commented Jun 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44627/

@SparkQA
Copy link

SparkQA commented Jun 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44630/

@SparkQA
Copy link

SparkQA commented Jun 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44630/

@SparkQA
Copy link

SparkQA commented Jun 22, 2021

Test build #140099 has finished for PR 32980 at commit c774797.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2021

Test build #140102 has finished for PR 32980 at commit 4574b30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* evaluating this subexpression, we should evaluate all children
* subexpressions first. This is used if we want to selectively evaluate
* particular subexpressions, instead of all at once. In the case, we need
* to make sure we evaluate all children subexpressions too.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your previous PR improves EquivalentExpressions to always return child subexpression first. It seems that PR is not useful after this PR because we track the children explicitly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly. We need to return child subexpressions first. So we can make sure child subexpression is codegen-ed and put into the map before parent subexpression. When we want to codegen parent subexpression, it can look up the child subexpression and put it as child of the parent.

Copy link
Member Author

@viirya viirya Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I have a new idea for how to codegen subexpression following child-parent orders without sorting. It is more reliable than the sorting approach. I will open another PR for that.

@viirya
Copy link
Member Author

viirya commented Jun 24, 2021

Any more thoughts? @cloud-fan @maropu

@viirya
Copy link
Member Author

viirya commented Jun 25, 2021

@cloud-fan Could you take another look? Thanks!

* evaluating a subexpression, this method will clean up the code block to avoid duplicate
* evaluation.
*/
def evaluateSubExprEliminationState(subExprStates: Iterable[SubExprEliminationState]): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Iterable -> Seq?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All its caller side use Iterable. If changing to Seq here, all callers need to add .toSeq.

* expressions and populates the mapping of common subexpressions to the generated code snippets.
*
* The generated code snippet for subexpression is wrapped in `SubExprEliminationState`, which
* contains a `ExprCode` and the children `SubExprEliminationState` if any. The `ExprCode`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a ExprCode -> an ExprCode

childrenSubExprs += subExprEliminationExprs(e)
case _ =>
}
val state = SubExprEliminationState(eval, childrenSubExprs.toSeq.reverse)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

childrenSubExprs.toSeq.reverse -> childrenSubExprs.reverse?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, how about moving .reverse into the SubExprEliminationState side if we always need to sort it;

object SubExprEliminationState {
  def apply(eval: ExprCode, children: Seq[SubExprEliminationState]): SubExprEliminationState = {
    new SubExprEliminationState(eval, children.reverse)
  }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

val childrenSubExprs = mutable.ArrayBuffer.empty[SubExprEliminationState]
exprs.head.foreach {
case e if subExprEliminationExprs.contains(e) =>
childrenSubExprs += subExprEliminationExprs(e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Is it difficult to add some tests for this new behaviour?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add a few tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added new test.

object SubExprEliminationState {
def apply(
eval: ExprCode,
children: Seq[SubExprEliminationState] = Seq.empty): SubExprEliminationState = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: def apply(eval: ExprCode): .... If children parameter is also provided, the default case class apply should work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for @maropu's comment #32980 (comment).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you mean to also add def apply(eval: ExprCode) here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added def apply(eval: ExprCode).

// Collects other subexpressions from the children.
val childrenSubExprs = mutable.ArrayBuffer.empty[SubExprEliminationState]
exprs.head.foreach {
case e if subExprEliminationExprs.contains(e) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add some comments to explain the assumption: this code works because EquivalentExpressions returns child expressions first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW collecting child expressions here looks really inefficient, but I don't have a better idea for now ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. This is not general expression but special (subexpr) ones, so we don't do collecting child expressions in general but in limited range. Except that if you have many subexpr and they are highly nested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add some comments to explain the assumption: this code works because EquivalentExpressions returns child expressions first.

As I commented before, I plan to remove the sorting. A better idea is to add SubExprEliminationState first into the map (not codegen yet). Then during codegen, we can look at the map to chain children.


val (codes, subExprsMap, exprCodes) = if (nonSplitExprCode.map(_.length).sum > splitThreshold) {
val needSplit = nonSplitCode.map(_.eval.code.length).sum > SQLConf.get.methodSplitThreshold
val (subExprsMap, exprCodes) = if (needSplit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR: so here we repeat the logic of generating SubExprEliminationStates with splitting the code? nonSplitCode is totally wasted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously it is lazy so we can do non-split conditionally. Now we nestedly generate subExprs so it cannot be lazy now. SubExprEliminationStates are needed to nestedly generate code for them.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a few minor comments

@SparkQA
Copy link

SparkQA commented Jun 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44885/

@SparkQA
Copy link

SparkQA commented Jun 29, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44885/

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Test build #140369 has finished for PR 32980 at commit 2ecb592.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class MergedBlockMetaRequest extends AbstractMessage implements RequestMessage
  • public class MergedBlockMetaSuccess extends AbstractResponseMessage
  • public abstract class AbstractFetchShuffleBlocks extends BlockTransferMessage
  • public class FetchShuffleBlockChunks extends AbstractFetchShuffleBlocks
  • public class FetchShuffleBlocks extends AbstractFetchShuffleBlocks
  • final case class FileNameSpec(prefix: String, suffix: String)
  • class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String])
  • class DecimalOps(FractionalOps):
  • class IntegralExtensionOps(IntegralOps):
  • class FractionalExtensionOps(FractionalOps):
  • class StringExtensionOps(StringOps):
  • new_class = type(\"NameType\", (NameTypeHolder,),
  • class GroupBy(Generic[T_Frame], metaclass=ABCMeta):
  • class DataFrameGroupBy(GroupBy[DataFrame]):
  • class SeriesGroupBy(GroupBy[Series]):
  • new_class = type(\"NameType\", (NameTypeHolder,),
  • class SparkIndexOpsMethods(Generic[T_IndexOps], metaclass=ABCMeta):
  • class SparkSeriesMethods(SparkIndexOpsMethods[\"ps.Series\"]):
  • class SparkIndexMethods(SparkIndexOpsMethods[\"ps.Index\"]):
  • class RollingAndExpanding(Generic[T_Frame], metaclass=ABCMeta):
  • class RollingLike(RollingAndExpanding[T_Frame]):
  • class Rolling(RollingLike[T_Frame]):
  • class RollingGroupby(RollingLike[T_Frame]):
  • class ExpandingLike(RollingAndExpanding[T_Frame]):
  • class Expanding(ExpandingLike[T_Frame]):
  • class ExpandingGroupby(ExpandingLike[T_Frame]):
  • sealed trait FieldName extends LeafExpression with Unevaluable
  • case class UnresolvedFieldName(name: Seq[String]) extends FieldName
  • case class ResolvedFieldName(name: Seq[String]) extends FieldName
  • case class Cast(
  • case class GetTimestampWithoutTZ(
  • case class ParseToTimestampWithoutTZ(
  • case class RebalancePartitions(
  • trait AlterTableCommand extends UnaryCommand
  • case class AlterTableDropColumns(
  • case class AlterTableRenameColumn(
  • new SparkException(s\"Cannot find catalog plugin class for catalog '$name': $pluginClassName\")
  • new SparkException(\"Cannot instantiate abstract catalog plugin class for \" +
  • new SparkException(s\"Can not load in UserDefinedType $
  • final class ParquetReadState
  • case class MergingSessionsExec(
  • class MergingSessionsIterator(
  • trait StatefulOperatorCustomMetric
  • case class StatefulOperatorCustomSumMetric(name: String, desc: String)
  • trait TestGroupState[S] extends GroupState[S]

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Test build #140394 has finished for PR 32980 at commit 014bc8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jun 30, 2021

@maropu Any more comments? Otherwise I will merge this later. Thanks.

@viirya
Copy link
Member Author

viirya commented Jun 30, 2021

Thanks for review! Merging to master.

@viirya viirya closed this in 064230d Jun 30, 2021
@viirya viirya deleted the subexpr-eval branch June 30, 2021 05:15
@maropu
Copy link
Member

maropu commented Jul 1, 2021

Thank you, @viirya . late lgtm.

// at least two nodes) as the cost of doing it is expected to be low.

val subExprCode = s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we use subexprFunctions += subExprCode here? otherwise we are calling addNewFunction twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, as the functions in class is a map, it will overwrite. But yes, we should use subExprCode. Let me submit a followup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

viirya added a commit that referenced this pull request Jul 13, 2021
…of addNewFunction

### What changes were proposed in this pull request?

A followup of #32980. We should use `subExprCode` to avoid duplicate call of `addNewFunction`.

### Why are the changes needed?

Avoid duplicate all of `addNewFunction`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing test.

Closes #33305 from viirya/fix-minor.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants