Skip to content

Commit

Permalink
[GLUTEN-7690][CORE][CH][VL] Remove GlutenConfig from ColumnarRuleApplier
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer committed Oct 26, 2024
1 parent 619624a commit 395b0bf
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig.{EXTENDED_COLUMNAR_POST_RULES, EXTENDED_COLUMNAR_TRANSFORM_RULES}
import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
Expand Down Expand Up @@ -89,7 +90,7 @@ private object CHRuleApi {
injector.injectTransform(
c =>
intercept(
SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session)))
SparkPlanRules.extendedColumnarRule(c.session.sessionState.conf.getConf(EXTENDED_COLUMNAR_TRANSFORM_RULES))(c.session)))
injector.injectTransform(c => InsertTransitions(c.outputsColumnar))

// Gluten columnar: Fallback policies.
Expand All @@ -101,14 +102,14 @@ private object CHRuleApi {
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => intercept(each(c.session))))
injector.injectPost(c => ColumnarCollapseTransformStages(c.conf))
injector.injectPost(c => ColumnarCollapseTransformStages())
injector.injectTransform(
c =>
intercept(SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session)))
intercept(SparkPlanRules.extendedColumnarRule(c.session.sessionState.conf.getConf(EXTENDED_COLUMNAR_POST_RULES))(c.session)))

// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.conf, c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ private object VeloxRuleApi {
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => each(c.session)))
injector.injectPost(c => ColumnarCollapseTransformStages(c.conf))
injector.injectPost(c => ColumnarCollapseTransformStages())

// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.conf, c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}

Expand Down Expand Up @@ -116,9 +116,9 @@ private object VeloxRuleApi {
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.inject(c => each(c.session)))
injector.inject(c => ColumnarCollapseTransformStages(c.conf))
injector.inject(c => ColumnarCollapseTransformStages())
injector.inject(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.inject(c => GlutenFallbackReporter(c.conf, c.session))
injector.inject(c => GlutenFallbackReporter(c.session))
injector.inject(_ => RemoveFallbackTagRule())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.gluten.extension.columnar

import org.apache.gluten.GlutenConfig
import org.apache.gluten.extension.util.AdaptiveContext

import org.apache.spark.sql.SparkSession
Expand All @@ -30,9 +29,5 @@ object ColumnarRuleApplier {
class ColumnarRuleCall(
val session: SparkSession,
val ac: AdaptiveContext,
val outputsColumnar: Boolean) {
val conf: GlutenConfig = {
new GlutenConfig(session.sessionState.conf)
}
}
val outputsColumnar: Boolean) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backend.Backend
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution._
Expand Down Expand Up @@ -114,7 +113,6 @@ case class InputIteratorTransformer(child: SparkPlan) extends UnaryTransformSupp
* generate/compile code.
*/
case class ColumnarCollapseTransformStages(
glutenConfig: GlutenConfig,
transformStageCounter: AtomicInteger = ColumnarCollapseTransformStages.transformStageCounter)
extends Rule[SparkPlan] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.GlutenConfig.{FALLBACK_REPORTER_ENABLED, VALIDATION_LOG_LEVEL}
import org.apache.gluten.events.GlutenPlanFallbackEvent
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.FallbackTags
Expand All @@ -31,12 +31,10 @@ import org.apache.spark.sql.execution.ui.GlutenEventUtils
* This rule is used to collect all fallback reason.
* 1. print fallback reason for each plan node 2. post all fallback reason using one event
*/
case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSession)
extends Rule[SparkPlan]
with LogLevelUtil {
case class GlutenFallbackReporter(spark: SparkSession) extends Rule[SparkPlan] with LogLevelUtil {

override def apply(plan: SparkPlan): SparkPlan = {
if (!glutenConfig.enableFallbackReport) {
if (!spark.sessionState.conf.getConf(FALLBACK_REPORTER_ENABLED)) {
return plan
}
printFallbackReason(plan)
Expand All @@ -52,12 +50,14 @@ case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSessio
}

private def printFallbackReason(plan: SparkPlan): Unit = {
val validationLogLevel = glutenConfig.validationLogLevel
plan.foreachUp {
case _: GlutenPlan => // ignore
case p: SparkPlan if FallbackTags.nonEmpty(p) =>
val tag = FallbackTags.get(p)
logFallbackReason(validationLogLevel, p.nodeName, tag.reason())
logFallbackReason(
spark.sessionState.conf.getConf(VALIDATION_LOG_LEVEL),
p.nodeName,
tag.reason())
// With in next round stage in AQE, the physical plan would be a new instance that
// can not preserve the tag, so we need to set the fallback reason to logical plan.
// Then we can be aware of the fallback reason for the whole plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private object FallbackStrategiesSuite {
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())),
List(
c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()),
_ => ColumnarCollapseTransformStages(GlutenConfig.getConf)
_ => ColumnarCollapseTransformStages()
),
List(_ => RemoveFallbackTagRule())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private object FallbackStrategiesSuite {
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())),
List(
c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()),
_ => ColumnarCollapseTransformStages(GlutenConfig.getConf)
_ => ColumnarCollapseTransformStages()
),
List(_ => RemoveFallbackTagRule())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private object FallbackStrategiesSuite {
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())),
List(
c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()),
_ => ColumnarCollapseTransformStages(GlutenConfig.getConf)
_ => ColumnarCollapseTransformStages()
),
List(_ => RemoveFallbackTagRule())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private object FallbackStrategiesSuite {
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())),
List(
c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()),
_ => ColumnarCollapseTransformStages(GlutenConfig.getConf)
_ => ColumnarCollapseTransformStages()
),
List(_ => RemoveFallbackTagRule())
)
Expand Down

0 comments on commit 395b0bf

Please sign in to comment.