Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
adding more options
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
zhouyuan committed Apr 12, 2021
1 parent e3cc4a6 commit 360b0e7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
// disable ColumnarBatchScanExec according to config
return false
}
if (!enableColumnarBatchScan) return false
new ColumnarBatchScanExec(plan.output, plan.scan)
case plan: FileSourceScanExec =>
if (plan.supportsColumnar) {
Expand All @@ -74,10 +75,13 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
}
plan
case plan: ProjectExec =>
if(!enableColumnarProjFilter) return false
new ColumnarConditionProjectExec(null, plan.projectList, plan.child)
case plan: FilterExec =>
if (!enableColumnarProjFilter) return false
new ColumnarConditionProjectExec(plan.condition, null, plan.child)
case plan: HashAggregateExec =>
if (!enableColumnarHashAgg) return false
new ColumnarHashAggregateExec(
plan.requiredChildDistributionExpressions,
plan.groupingExpressions,
Expand All @@ -87,8 +91,10 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.resultExpressions,
plan.child)
case plan: UnionExec =>
if (!enableColumnarUnion) return false
new ColumnarUnionExec(plan.children)
case plan: ExpandExec =>
if (!enableColumnarExpand) return false
new ColumnarExpandExec(plan.projections, plan.output, plan.child)
case plan: SortExec =>
if (!enableColumnarSort) return false
Expand All @@ -100,6 +106,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.child,
plan.canChangeNumPartitions)
case plan: ShuffledHashJoinExec =>
if (!enableColumnarShuffledHashJoin) return false
ColumnarShuffledHashJoinExec(
plan.leftKeys,
plan.rightKeys,
Expand All @@ -109,10 +116,12 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.left,
plan.right)
case plan: BroadcastExchangeExec =>
if (!enableColumnarBroadcastExchange) return false
ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case plan: BroadcastHashJoinExec =>
// We need to check if BroadcastExchangeExec can be converted to columnar-based.
// If not, BHJ should also be row-based.
if (!enableColumnarBroadcastJoin) return false
val left = plan.left
left match {
case exec: BroadcastExchangeExec =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,50 @@ class ColumnarPluginConfig(conf: SQLConf) {
// for all operators
val enableCpu = getCpu()

// enable or disable columnar batchscan
val enableColumnarBatchScan: Boolean =
conf.getConfString("spark.oap.sql.columnar.batchscan", "true").toBoolean && enableCpu

// enable or disable columnar hashagg
val enableColumnarHashAgg: Boolean =
conf.getConfString("spark.oap.sql.columnar.hashagg", "true").toBoolean && enableCpu

// enable or disable columnar project and filter
val enableColumnarProjFilter: Boolean =
conf.getConfString("spark.oap.sql.columnar.projfilter", "true").toBoolean && enableCpu

// enable or disable columnar sort
val enableColumnarSort: Boolean =
conf.getConfString("spark.oap.sql.columnar.sort", "false").toBoolean && enableCpu
conf.getConfString("spark.oap.sql.columnar.sort", "true").toBoolean && enableCpu

// enable or disable codegen columnar sort
val enableColumnarCodegenSort: Boolean =
conf.getConfString("spark.oap.sql.columnar.codegen.sort", "true").toBoolean && enableCpu
conf.getConfString("spark.oap.sql.columnar.codegen.sort", "true").toBoolean && enableColumnarSort

// enable or disable columnar window
val enableColumnarWindow: Boolean =
conf.getConfString("spark.oap.sql.columnar.window", "true").toBoolean && enableCpu

// enable or disable columnar shuffledhashjoin
val enableColumnarShuffledHashJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.shuffledhashjoin", "true").toBoolean && enableCpu

// enable or disable columnar sortmergejoin
// this should be set with preferSortMergeJoin=false
val enableColumnarSortMergeJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.sortmergejoin", "false").toBoolean && enableCpu
conf.getConfString("spark.oap.sql.columnar.sortmergejoin", "true").toBoolean && enableCpu

// enable or disable columnar union
val enableColumnarUnion: Boolean =
conf.getConfString("spark.oap.sql.columnar.union", "true").toBoolean && enableCpu

// enable or disable columnar expand
val enableColumnarExpand: Boolean =
conf.getConfString("spark.oap.sql.columnar.expand", "true").toBoolean && enableCpu

// enable or disable columnar broadcastexchange
val enableColumnarBroadcastExchange: Boolean =
conf.getConfString("spark.oap.sql.columnar.broadcastexchange", "true").toBoolean && enableCpu

// enable or disable NAN check
val enableColumnarNaNCheck: Boolean =
Expand Down

0 comments on commit 360b0e7

Please sign in to comment.