Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
spark

fixup

fixup

fixup
  • Loading branch information
zhztheplayer committed Mar 6, 2024
1 parent e9fdd6e commit 3b514b1
Show file tree
Hide file tree
Showing 17 changed files with 577 additions and 551 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import io.glutenproject.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import io.glutenproject.execution._
import io.glutenproject.expression._
import io.glutenproject.expression.ConverterUtils.FunctionConfig
import io.glutenproject.extension.{FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage, TransformPreOverrides}
import io.glutenproject.extension.{FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage}
import io.glutenproject.extension.columnar.AddTransformHintRule
import io.glutenproject.extension.columnar.MiscColumnarRules.TransformPreOverrides
import io.glutenproject.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode}
import io.glutenproject.utils.CHJoinValidateUtil
import io.glutenproject.vectorized.CHColumnarBatchSerializer
Expand Down Expand Up @@ -54,9 +55,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx
import org.apache.spark.sql.execution.joins.{BuildSideRelation, ClickHouseBuildSideRelation, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.utils.{CHExecUtil, PushDownUtil}
import org.apache.spark.sql.extension.ClickHouseAnalysis
import org.apache.spark.sql.extension.CommonSubexpressionEliminateRule
import org.apache.spark.sql.extension.RewriteDateTimestampComparisonRule
import org.apache.spark.sql.extension.{ClickHouseAnalysis, CommonSubexpressionEliminateRule, RewriteDateTimestampComparisonRule}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package io.glutenproject.execution
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.expression._
import io.glutenproject.expression.ConverterUtils.FunctionConfig
import io.glutenproject.extension.RewriteTypedImperativeAggregate
import io.glutenproject.extension.columnar.RewriteTypedImperativeAggregate
import io.glutenproject.substrait.`type`.{TypeBuilder, TypeNode}
import io.glutenproject.substrait.{AggregationParams, SubstraitContext}
import io.glutenproject.substrait.expression.{AggregateFunctionNode, ExpressionBuilder, ExpressionNode, ScalarFunctionNode}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension
package io.glutenproject.extension.columnar

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.ProjectExecTransformer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension
package io.glutenproject.extension.columnar

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ApplyColumnarRulesAndInsertTransitions, ColumnarToRowExec, RowToColumnarExec, SparkPlan}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension
package io.glutenproject.extension.columnar

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.SortExecTransformer
import io.glutenproject.extension.columnar.TransformHints

import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension
package io.glutenproject.extension.columnar

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.BroadcastHashJoinExecTransformer
import io.glutenproject.extension.columnar.{TRANSFORM_UNSUPPORTED, TransformHints}
import io.glutenproject.extension.GlutenPlan
import io.glutenproject.extension.columnar.MiscColumnarRules.TransformPostOverrides
import io.glutenproject.utils.PlanUtil

import org.apache.spark.rdd.RDD
Expand All @@ -33,36 +34,24 @@ import org.apache.spark.sql.execution.exchange.Exchange

// spotless:off
/**
* Note, this rule should only fallback to row-based plan if there is no harm.
* The follow case should be handled carefully
* Note, this rule should only fallback to row-based plan if there is no harm. The follow case
* should be handled carefully
*
* 1. A BHJ and the previous broadcast exchange is columnar
* We should still make the BHJ columnar, otherwise it will fail if
* the vanilla BHJ accept a columnar broadcast exchange, e.g.,
* 1. A BHJ and the previous broadcast exchange is columnar We should still make the BHJ columnar,
* otherwise it will fail if the vanilla BHJ accept a columnar broadcast exchange, e.g.,
*
* Scan Scan
* \ |
* \ Columnar Broadcast Exchange
* \ /
* BHJ
* |
* VeloxColumnarToRow
* |
* Project (unsupport columnar)
* Scan Scan \ | \ Columnar Broadcast Exchange \ / BHJ \| VeloxColumnarToRow \| Project (unsupport
* columnar)
*
* 2. The previous shuffle exchange stage is a columnar shuffle exchange
* We should use VeloxColumnarToRow rather than vanilla Spark ColumnarToRowExec, e.g.,
* 2. The previous shuffle exchange stage is a columnar shuffle exchange We should use
* VeloxColumnarToRow rather than vanilla Spark ColumnarToRowExec, e.g.,
*
* Scan
* |
* Columnar Shuffle Exchange
* |
* VeloxColumnarToRow
* |
* Project (unsupport columnar)
* Scan \| Columnar Shuffle Exchange \| VeloxColumnarToRow \| Project (unsupport columnar)
*
* @param isAdaptiveContext If is inside AQE
* @param originalPlan The vanilla SparkPlan without apply gluten transform rules
* @param isAdaptiveContext
* If is inside AQE
* @param originalPlan
* The vanilla SparkPlan without apply gluten transform rules
*/
// spotless:on
case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkPlan)
Expand Down Expand Up @@ -105,39 +94,40 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP
transitionCost
}

// spotless:off
/**
* When making a stage fall back, it's possible that we need a ColumnarToRow to adapt to last
* stage's columnar output. So we need to evaluate the cost, i.e., the number of required
* ColumnarToRow between entirely fallback stage and last stage(s). Thus, we can avoid possible
* performance degradation caused by fallback policy.
* Note, this rule should only fallback to row-based plan if there is no harm.
* The follow case should be handled carefully
*
* spotless:off
* 1. A BHJ and the previous broadcast exchange is columnar
* We should still make the BHJ columnar, otherwise it will fail if
* the vanilla BHJ accept a columnar broadcast exchange, e.g.,
*
* Spark plan before applying fallback policy:
*
* ColumnarExchange
* ----------- | --------------- last stage
* HashAggregateTransformer
* Scan Scan
* \ |
* \ Columnar Broadcast Exchange
* \ /
* BHJ
* |
* ColumnarToRow
* VeloxColumnarToRow
* |
* Project
*
* To illustrate the effect if cost is not taken into account, here is spark plan
* after applying whole stage fallback policy (threshold = 1):
* Project (unsupport columnar)
*
* ColumnarExchange
* ----------- | --------------- last stage
* ColumnarToRow
* |
* HashAggregate
* |
* Project
* 2. The previous shuffle exchange stage is a columnar shuffle exchange
* We should use VeloxColumnarToRow rather than vanilla Spark ColumnarToRowExec, e.g.,
*
* So by considering the cost, the fallback policy will not be applied.
* Scan
* |
* Columnar Shuffle Exchange
* |
* VeloxColumnarToRow
* |
* Project (unsupport columnar)
*
* spotless:on
* @param isAdaptiveContext If is inside AQE
* @param originalPlan The vanilla SparkPlan without apply gluten transform rules
*/
// spotless:on
private def countStageFallbackTransitionCost(plan: SparkPlan): Int = {
var stageFallbackTransitionCost = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension
package io.glutenproject.extension.columnar

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.BackendsApiManager
Expand Down
Loading

0 comments on commit 3b514b1

Please sign in to comment.