Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-196] clean up native sql options #215

Merged
merged 4 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
val enableColumnarSort = columnarConf.enableColumnarSort
val enableColumnarWindow = columnarConf.enableColumnarWindow
val enableColumnarSortMergeJoin = columnarConf.enableColumnarSortMergeJoin
val enableColumnarBatchScan = columnarConf.enableColumnarBatchScan
val enableColumnarProjFilter = columnarConf.enableColumnarProjFilter
val enableColumnarHashAgg = columnarConf.enableColumnarHashAgg
val enableColumnarUnion = columnarConf.enableColumnarUnion
val enableColumnarExpand = columnarConf.enableColumnarExpand
val enableColumnarShuffledHashJoin = columnarConf.enableColumnarShuffledHashJoin
val enableColumnarBroadcastExchange = columnarConf.enableColumnarBroadcastExchange
val enableColumnarBroadcastJoin = columnarConf.enableColumnarBroadcastJoin

val testing = columnarConf.isTesting

private def tryConvertToColumnar(plan: SparkPlan): Boolean = {
Expand All @@ -62,6 +71,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
// disable ColumnarBatchScanExec according to config
return false
}
if (!enableColumnarBatchScan) return false
new ColumnarBatchScanExec(plan.output, plan.scan)
case plan: FileSourceScanExec =>
if (plan.supportsColumnar) {
Expand All @@ -74,10 +84,13 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
}
plan
case plan: ProjectExec =>
if(!enableColumnarProjFilter) return false
new ColumnarConditionProjectExec(null, plan.projectList, plan.child)
case plan: FilterExec =>
if (!enableColumnarProjFilter) return false
new ColumnarConditionProjectExec(plan.condition, null, plan.child)
case plan: HashAggregateExec =>
if (!enableColumnarHashAgg) return false
new ColumnarHashAggregateExec(
plan.requiredChildDistributionExpressions,
plan.groupingExpressions,
Expand All @@ -87,8 +100,10 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.resultExpressions,
plan.child)
case plan: UnionExec =>
if (!enableColumnarUnion) return false
new ColumnarUnionExec(plan.children)
case plan: ExpandExec =>
if (!enableColumnarExpand) return false
new ColumnarExpandExec(plan.projections, plan.output, plan.child)
case plan: SortExec =>
if (!enableColumnarSort) return false
Expand All @@ -100,6 +115,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.child,
plan.canChangeNumPartitions)
case plan: ShuffledHashJoinExec =>
if (!enableColumnarShuffledHashJoin) return false
ColumnarShuffledHashJoinExec(
plan.leftKeys,
plan.rightKeys,
Expand All @@ -109,10 +125,12 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.left,
plan.right)
case plan: BroadcastExchangeExec =>
if (!enableColumnarBroadcastExchange) return false
ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case plan: BroadcastHashJoinExec =>
// We need to check if BroadcastExchangeExec can be converted to columnar-based.
// If not, BHJ should also be row-based.
if (!enableColumnarBroadcastJoin) return false
val left = plan.left
left match {
case exec: BroadcastExchangeExec =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,122 @@
package com.intel.oap

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf

case class ColumnarNumaBindingInfo(
enableNumaBinding: Boolean,
totalCoreRange: Array[String] = null,
numCoresPerExecutor: Int = -1) {}

class ColumnarPluginConfig(conf: SQLConf) {
class ColumnarPluginConfig(conf: SQLConf) extends Logging {
def getCpu(): Boolean = {
val source = scala.io.Source.fromFile("/proc/cpuinfo")
val lines = try source.mkString finally source.close()
//TODO(): check CPU flags to enable/disable AVX512
if (lines.contains("GenuineIntel")) {
return true
} else {
//System.out.println(actualSchemaRoot.getRowCount());
logWarning("running on non-intel CPU, disable all columnar operators")
return false
}
}

// for all operators
val enableCpu = getCpu()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing option for project and hashagg

// enable or disable columnar batchscan
val enableColumnarBatchScan: Boolean =
conf.getConfString("spark.oap.sql.columnar.batchscan", "true").toBoolean && enableCpu

// enable or disable columnar hashagg
val enableColumnarHashAgg: Boolean =
conf.getConfString("spark.oap.sql.columnar.hashagg", "true").toBoolean && enableCpu

// enable or disable columnar project and filter
val enableColumnarProjFilter: Boolean =
conf.getConfString("spark.oap.sql.columnar.projfilter", "true").toBoolean && enableCpu

// enable or disable columnar sort
val enableColumnarSort: Boolean =
conf.getConfString("spark.sql.columnar.sort", "false").toBoolean
conf.getConfString("spark.oap.sql.columnar.sort", "true").toBoolean && enableCpu

// enable or disable codegen columnar sort
val enableColumnarCodegenSort: Boolean =
conf.getConfString("spark.sql.columnar.codegen.sort", "true").toBoolean
val enableColumnarNaNCheck: Boolean =
conf.getConfString("spark.sql.columnar.nanCheck", "false").toBoolean
val enableColumnarBroadcastJoin: Boolean =
conf.getConfString("spark.sql.columnar.sort.broadcastJoin", "true").toBoolean
conf.getConfString("spark.oap.sql.columnar.codegen.sort", "true").toBoolean && enableColumnarSort

// enable or disable columnar window
val enableColumnarWindow: Boolean =
conf.getConfString("spark.sql.columnar.window", "true").toBoolean
conf.getConfString("spark.oap.sql.columnar.window", "true").toBoolean && enableCpu

// enable or disable columnar shuffledhashjoin
val enableColumnarShuffledHashJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.shuffledhashjoin", "true").toBoolean && enableCpu

// enable or disable columnar sortmergejoin
// this should be set with preferSortMergeJoin=false
val enableColumnarSortMergeJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.sortmergejoin", "false").toBoolean
conf.getConfString("spark.oap.sql.columnar.sortmergejoin", "true").toBoolean && enableCpu

// enable or disable columnar union
val enableColumnarUnion: Boolean =
conf.getConfString("spark.oap.sql.columnar.union", "true").toBoolean && enableCpu

// enable or disable columnar expand
val enableColumnarExpand: Boolean =
conf.getConfString("spark.oap.sql.columnar.expand", "true").toBoolean && enableCpu

// enable or disable columnar broadcastexchange
val enableColumnarBroadcastExchange: Boolean =
conf.getConfString("spark.oap.sql.columnar.broadcastexchange", "true").toBoolean && enableCpu

// enable or disable NAN check
val enableColumnarNaNCheck: Boolean =
conf.getConfString("spark.oap.sql.columnar.nanCheck", "true").toBoolean

// enable or disable hashcompare in hashjoins or hashagg
val hashCompare: Boolean =
conf.getConfString("spark.oap.sql.columnar.hashCompare", "true").toBoolean

// enable or disable columnar BroadcastHashJoin
val enableColumnarBroadcastJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.broadcastJoin", "true").toBoolean && enableCpu

// enable or disable columnar wholestagecodegen
val enableColumnarWholeStageCodegen: Boolean =
conf.getConfString("spark.oap.sql.columnar.wholestagecodegen", "true").toBoolean && enableCpu

// enable or disable columnar exchange
val enableColumnarShuffle: Boolean = conf
.getConfString("spark.shuffle.manager", "sort")
.equals("org.apache.spark.shuffle.sort.ColumnarShuffleManager") && enableCpu

// for all perf turnings
// prefer to use columnar operators if set to true
val enablePreferColumnar: Boolean =
conf.getConfString("spark.oap.sql.columnar.preferColumnar", "false").toBoolean
val enableJoinOptimizationReplace: Boolean =
conf.getConfString("spark.oap.sql.columnar.joinOptimizationReplace", "false").toBoolean

// fallback to row operators if there are several continous joins
val joinOptimizationThrottle: Integer =
conf.getConfString("spark.oap.sql.columnar.joinOptimizationLevel", "6").toInt
val enableColumnarWholeStageCodegen: Boolean =
conf.getConfString("spark.oap.sql.columnar.wholestagecodegen", "true").toBoolean
val enableColumnarShuffle: Boolean = conf
.getConfString("spark.shuffle.manager", "sort")
.equals("org.apache.spark.shuffle.sort.ColumnarShuffleManager")

val batchSize: Int =
conf.getConfString("spark.sql.execution.arrow.maxRecordsPerBatch", "10000").toInt

// enable or disable metrics in columnar wholestagecodegen operator
val enableMetricsTime: Boolean =
conf.getConfString(
"spark.oap.sql.columnar.wholestagecodegen.breakdownTime",
"false").toBoolean

// a folder to store the codegen files
val tmpFile: String =
conf.getConfString("spark.sql.columnar.tmp_dir", null)
conf.getConfString("spark.oap.sql.columnar.tmp_dir", null)

@deprecated val broadcastCacheTimeout: Int =
conf.getConfString("spark.sql.columnar.sort.broadcast.cache.timeout", "-1").toInt
val hashCompare: Boolean =
conf.getConfString("spark.oap.sql.columnar.hashCompare", "false").toBoolean

// Whether to spill the partition buffers when buffers are full.
// If false, the partition buffers will be cached in memory first,
// and the cached buffers will be spilled when reach maximum memory.
Expand All @@ -70,8 +143,11 @@ class ColumnarPluginConfig(conf: SQLConf) {
// The supported customized compression codec is lz4 and fastpfor.
val columnarShuffleUseCustomizedCompressionCodec: String =
conf.getConfString("spark.oap.sql.columnar.shuffle.customizedCompression.codec", "lz4")

// a helper flag to check if it's in unit test
val isTesting: Boolean =
conf.getConfString("spark.oap.sql.columnar.testing", "false").toBoolean

val numaBindingInfo: ColumnarNumaBindingInfo = {
val enableNumaBinding: Boolean =
conf.getConfString("spark.oap.sql.columnar.numaBinding", "false").toBoolean
Expand Down