Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kiszk committed Dec 19, 2017
1 parent 31914c0 commit 0e45c19
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,12 @@ class CodegenContext {
* this field. The argument is the name of the mutable state variable.
* If left blank, the field will be default-initialized.
* @param forceInline whether the declaration and initialization code may be inlined rather than
* compacted. Please set `true` into forceInline, if you want to access the
* status fast (e.g. frequently accessed) or if you want to use the original
* variable name
* @param useFreshName If this is false and forceInline is true, the name is not changed
* compacted. Please set `true` into forceInline for one of the followings:
* 1. use the original name of the status
* 2. expect to non-frequently generate the status
* (e.g. not much sort operators in one stage)
* @param useFreshName If this is false and the mutable state ends up inlining in the outer
* class, the name is not changed
* @return the name of the mutable state variable, which is the original name or fresh name if
* the variable is inlined to the outer class, or an array access if the variable is to
* be stored in an array of variables of the same type.
Expand All @@ -221,7 +223,6 @@ class CodegenContext {
* 2. its type is primitive type and the total number of the inlined mutable variables
* is less than `CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD`
* 3. its type is multi-dimensional array
* A primitive type variable will be inlined into outer class when the total number of
* When a variable is compacted into an array, the max size of the array for compaction
* is given by `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ case class Like(left: Expression, right: Expression) extends StringRegexExpressi
if (rVal != null) {
val regexStr =
StringEscapeUtils.escapeJava(escape(rVal.asInstanceOf[UTF8String].toString()))
// inline mutable state since not many Like operations in a task
val pattern = ctx.addMutableState(patternClass, "patternLike",
v => s"""$v = ${patternClass}.compile("$regexStr");""", forceInline = true)

Expand Down Expand Up @@ -193,6 +194,7 @@ case class RLike(left: Expression, right: Expression) extends StringRegexExpress
if (rVal != null) {
val regexStr =
StringEscapeUtils.escapeJava(rVal.asInstanceOf[UTF8String].toString())
// inline mutable state since not many RLike operations in a task
val pattern = ctx.addMutableState(patternClass, "patternRLike",
v => s"""$v = ${patternClass}.compile("$regexStr");""", forceInline = true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ case class SortExec(
// Initialize the class member variables. This includes the instance of the Sorter and
// the iterator to return sorted rows.
val thisPlan = ctx.addReferenceObj("plan", this)
// inline mutable state since not many Sort operations in a task
sorterVariable = ctx.addMutableState(classOf[UnsafeExternalRowSorter].getName, "sorter",
v => s"$v = $thisPlan.createSorter();", forceInline = true)
val metrics = ctx.addMutableState(classOf[TaskMetrics].getName, "metrics",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp

override def doProduce(ctx: CodegenContext): String = {
// Right now, InputAdapter is only used when there is one input RDD.
// inline mutable state since an inputAdaptor in a task
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
val row = ctx.freshName("row")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,15 @@ case class HashAggregateExec(
}

// Create a name for the iterator from the regular hash map.
val iterTerm = ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, "mapIter")
// inline mutable state since not many aggregation operations in a task
val iterTerm = ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName,
"mapIter", forceInline = true)
// create hashMap
val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName
hashMapTerm = ctx.addMutableState(hashMapClassName, "hashMap",
v => s"$v = $thisPlan.createHashMap();")
sorterTerm = ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, "sorter")
sorterTerm = ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, "sorter",
forceInline = true)

val doAgg = ctx.freshName("doAggregateWithKeys")
val peakMemory = metricTerm(ctx, "peakMemory")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ case class SampleExec(
val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName
val initSampler = ctx.freshName("initSampler")

// inline mutable state since not many Sample operations in a task
val sampler = ctx.addMutableState(s"$samplerClass<UnsafeRow>", "sampleReplace",
v => {
val initSamplerFuncName = ctx.addNewFunction(initSampler,
Expand Down Expand Up @@ -317,7 +318,7 @@ case class SampleExec(
v => s"""
| $v = new $samplerClass<UnsafeRow>($lowerBound, $upperBound, false);
| $v.setSeed(${seed}L + partitionIndex);
""".stripMargin.trim, forceInline = true)
""".stripMargin.trim)

s"""
| if ($sampler.sample() != 0) {
Expand Down Expand Up @@ -370,6 +371,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val ev = ExprCode("", "false", value)
val BigInt = classOf[java.math.BigInteger].getName

// inline mutable state since not many Range operations in a task
val taskContext = ctx.addMutableState("TaskContext", "taskContext",
v => s"$v = TaskContext.get();", forceInline = true)
val inputMetrics = ctx.addMutableState("InputMetrics", "inputMetrics",
Expand Down Expand Up @@ -435,10 +437,6 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
| }
""".stripMargin)

// Right now, Range is only used when there is one upstream.
val input = ctx.addMutableState("scala.collection.Iterator", "input",
v => s"$v = inputs[0];", forceInline = true)

val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ case class BroadcastHashJoinExec(
// At the end of the task, we update the avg hash probe.
val avgHashProbe = metricTerm(ctx, "avgHashProbe")

// inline mutable state since not many join operations in a task
val relationTerm = ctx.addMutableState(clsName, "relation",
v => s"""
| $v = (($clsName) $broadcast.value()).asReadOnlyCopy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ case class SortMergeJoinExec(
*/
private def genScanner(ctx: CodegenContext): (String, String) = {
// Create class member for next row from both sides.
// inline mutable state since not many join operations in a task
val leftRow = ctx.addMutableState("InternalRow", "leftRow", forceInline = true)
val rightRow = ctx.addMutableState("InternalRow", "rightRow", forceInline = true)

Expand Down Expand Up @@ -575,6 +576,7 @@ case class SortMergeJoinExec(
override def needCopyResult: Boolean = true

override def doProduce(ctx: CodegenContext): String = {
// inline mutable state since not many join operations in a task
val leftInput = ctx.addMutableState("scala.collection.Iterator", "leftInput",
v => s"$v = inputs[0];", forceInline = true)
val rightInput = ctx.addMutableState("scala.collection.Iterator", "rightInput",
Expand Down

0 comments on commit 0e45c19

Please sign in to comment.