From d8a4d4dfb2f947a65fff18a99605b6e3a19f1876 Mon Sep 17 00:00:00 2001 From: Yuan Date: Thu, 15 Apr 2021 15:25:10 +0800 Subject: [PATCH] [NSE-196] clean up native sql options (#215) * clean up native sql options Signed-off-by: Yuan Zhou * adding more options Signed-off-by: Yuan Zhou * adding more options Signed-off-by: Yuan Zhou * adding warning log for running on non-intel cpu Signed-off-by: Yuan Zhou --- .../com/intel/oap/ColumnarGuardRule.scala | 17 +++ .../com/intel/oap/ColumnarPluginConfig.scala | 114 +++++++++++++++--- 2 files changed, 112 insertions(+), 19 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala index eb6a153c0..22948f44f 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala @@ -52,11 +52,20 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] { val enableColumnarSort = columnarConf.enableColumnarSort val enableColumnarWindow = columnarConf.enableColumnarWindow val enableColumnarSortMergeJoin = columnarConf.enableColumnarSortMergeJoin + val enableColumnarBatchScan = columnarConf.enableColumnarBatchScan + val enableColumnarProjFilter = columnarConf.enableColumnarProjFilter + val enableColumnarHashAgg = columnarConf.enableColumnarHashAgg + val enableColumnarUnion = columnarConf.enableColumnarUnion + val enableColumnarExpand = columnarConf.enableColumnarExpand + val enableColumnarShuffledHashJoin = columnarConf.enableColumnarShuffledHashJoin + val enableColumnarBroadcastExchange = columnarConf.enableColumnarBroadcastExchange + val enableColumnarBroadcastJoin = columnarConf.enableColumnarBroadcastJoin private def tryConvertToColumnar(plan: SparkPlan): Boolean = { try { val columnarPlan = plan match { case plan: BatchScanExec => + if (!enableColumnarBatchScan) return false new ColumnarBatchScanExec(plan.output, plan.scan) case plan: FileSourceScanExec => if (plan.supportsColumnar) { @@ -73,10 +82,13 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] { } plan case plan: ProjectExec => + if(!enableColumnarProjFilter) return false new ColumnarConditionProjectExec(null, plan.projectList, plan.child) case plan: FilterExec => + if (!enableColumnarProjFilter) return false new ColumnarConditionProjectExec(plan.condition, null, plan.child) case plan: HashAggregateExec => + if (!enableColumnarHashAgg) return false new ColumnarHashAggregateExec( plan.requiredChildDistributionExpressions, plan.groupingExpressions, @@ -86,8 +98,10 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] { plan.resultExpressions, plan.child) case plan: UnionExec => + if (!enableColumnarUnion) return false new ColumnarUnionExec(plan.children) case plan: ExpandExec => + if (!enableColumnarExpand) return false new ColumnarExpandExec(plan.projections, plan.output, plan.child) case plan: SortExec => if (!enableColumnarSort) return false @@ -99,6 +113,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] { plan.child, plan.canChangeNumPartitions) case plan: ShuffledHashJoinExec => + if (!enableColumnarShuffledHashJoin) return false ColumnarShuffledHashJoinExec( plan.leftKeys, plan.rightKeys, @@ -108,10 +123,12 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] { plan.left, plan.right) case plan: BroadcastExchangeExec => + if (!enableColumnarBroadcastExchange) return false ColumnarBroadcastExchangeExec(plan.mode, plan.child) case plan: BroadcastHashJoinExec => // We need to check if BroadcastExchangeExec can be converted to columnar-based. // If not, BHJ should also be row-based. + if (!enableColumnarBroadcastJoin) return false val left = plan.left left match { case exec: BroadcastExchangeExec => diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala index b4354e0f7..6d574698f 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala @@ -18,6 +18,7 @@ package com.intel.oap import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf case class ColumnarNumaBindingInfo( @@ -25,42 +26,114 @@ case class ColumnarNumaBindingInfo( totalCoreRange: Array[String] = null, numCoresPerExecutor: Int = -1) {} -class ColumnarPluginConfig(conf: SQLConf) { +class ColumnarPluginConfig(conf: SQLConf) extends Logging { + def getCpu(): Boolean = { + val source = scala.io.Source.fromFile("/proc/cpuinfo") + val lines = try source.mkString finally source.close() + //TODO(): check CPU flags to enable/disable AVX512 + if (lines.contains("GenuineIntel")) { + return true + } else { + //System.out.println(actualSchemaRoot.getRowCount()); + logWarning("running on non-intel CPU, disable all columnar operators") + return false + } + } + + // for all operators + val enableCpu = getCpu() + + // enable or disable columnar batchscan + val enableColumnarBatchScan: Boolean = + conf.getConfString("spark.oap.sql.columnar.batchscan", "true").toBoolean && enableCpu + + // enable or disable columnar hashagg + val enableColumnarHashAgg: Boolean = + conf.getConfString("spark.oap.sql.columnar.hashagg", "true").toBoolean && enableCpu + + // enable or disable columnar project and filter + val enableColumnarProjFilter: Boolean = + conf.getConfString("spark.oap.sql.columnar.projfilter", "true").toBoolean && enableCpu + + // enable or disable columnar sort val enableColumnarSort: Boolean = - conf.getConfString("spark.sql.columnar.sort", "false").toBoolean + conf.getConfString("spark.oap.sql.columnar.sort", "true").toBoolean && enableCpu + + // enable or disable codegen columnar sort val enableColumnarCodegenSort: Boolean = - conf.getConfString("spark.sql.columnar.codegen.sort", "true").toBoolean - val enableColumnarNaNCheck: Boolean = - conf.getConfString("spark.sql.columnar.nanCheck", "false").toBoolean - val enableColumnarBroadcastJoin: Boolean = - conf.getConfString("spark.sql.columnar.sort.broadcastJoin", "true").toBoolean + conf.getConfString("spark.oap.sql.columnar.codegen.sort", "true").toBoolean && enableColumnarSort + + // enable or disable columnar window val enableColumnarWindow: Boolean = - conf.getConfString("spark.sql.columnar.window", "true").toBoolean + conf.getConfString("spark.oap.sql.columnar.window", "true").toBoolean && enableCpu + + // enable or disable columnar shuffledhashjoin + val enableColumnarShuffledHashJoin: Boolean = + conf.getConfString("spark.oap.sql.columnar.shuffledhashjoin", "true").toBoolean && enableCpu + + // enable or disable columnar sortmergejoin + // this should be set with preferSortMergeJoin=false val enableColumnarSortMergeJoin: Boolean = - conf.getConfString("spark.oap.sql.columnar.sortmergejoin", "false").toBoolean + conf.getConfString("spark.oap.sql.columnar.sortmergejoin", "true").toBoolean && enableCpu + + // enable or disable columnar union + val enableColumnarUnion: Boolean = + conf.getConfString("spark.oap.sql.columnar.union", "true").toBoolean && enableCpu + + // enable or disable columnar expand + val enableColumnarExpand: Boolean = + conf.getConfString("spark.oap.sql.columnar.expand", "true").toBoolean && enableCpu + + // enable or disable columnar broadcastexchange + val enableColumnarBroadcastExchange: Boolean = + conf.getConfString("spark.oap.sql.columnar.broadcastexchange", "true").toBoolean && enableCpu + + // enable or disable NAN check + val enableColumnarNaNCheck: Boolean = + conf.getConfString("spark.oap.sql.columnar.nanCheck", "true").toBoolean + + // enable or disable hashcompare in hashjoins or hashagg + val hashCompare: Boolean = + conf.getConfString("spark.oap.sql.columnar.hashCompare", "true").toBoolean + + // enable or disable columnar BroadcastHashJoin + val enableColumnarBroadcastJoin: Boolean = + conf.getConfString("spark.oap.sql.columnar.broadcastJoin", "true").toBoolean && enableCpu + + // enable or disable columnar wholestagecodegen + val enableColumnarWholeStageCodegen: Boolean = + conf.getConfString("spark.oap.sql.columnar.wholestagecodegen", "true").toBoolean && enableCpu + + // enable or disable columnar exchange + val enableColumnarShuffle: Boolean = conf + .getConfString("spark.shuffle.manager", "sort") + .equals("org.apache.spark.shuffle.sort.ColumnarShuffleManager") && enableCpu + + // for all perf turnings + // prefer to use columnar operators if set to true val enablePreferColumnar: Boolean = conf.getConfString("spark.oap.sql.columnar.preferColumnar", "false").toBoolean - val enableJoinOptimizationReplace: Boolean = - conf.getConfString("spark.oap.sql.columnar.joinOptimizationReplace", "false").toBoolean + + // fallback to row operators if there are several continous joins val joinOptimizationThrottle: Integer = conf.getConfString("spark.oap.sql.columnar.joinOptimizationLevel", "6").toInt - val enableColumnarWholeStageCodegen: Boolean = - conf.getConfString("spark.oap.sql.columnar.wholestagecodegen", "true").toBoolean - val enableColumnarShuffle: Boolean = conf - .getConfString("spark.shuffle.manager", "sort") - .equals("org.apache.spark.shuffle.sort.ColumnarShuffleManager") + val batchSize: Int = conf.getConfString("spark.sql.execution.arrow.maxRecordsPerBatch", "10000").toInt + + // enable or disable metrics in columnar wholestagecodegen operator val enableMetricsTime: Boolean = conf.getConfString( "spark.oap.sql.columnar.wholestagecodegen.breakdownTime", "false").toBoolean + + // a folder to store the codegen files val tmpFile: String = - conf.getConfString("spark.sql.columnar.tmp_dir", null) + conf.getConfString("spark.oap.sql.columnar.tmp_dir", null) + @deprecated val broadcastCacheTimeout: Int = conf.getConfString("spark.sql.columnar.sort.broadcast.cache.timeout", "-1").toInt - val hashCompare: Boolean = - conf.getConfString("spark.oap.sql.columnar.hashCompare", "false").toBoolean + // Whether to spill the partition buffers when buffers are full. // If false, the partition buffers will be cached in memory first, // and the cached buffers will be spilled when reach maximum memory. @@ -70,8 +143,11 @@ class ColumnarPluginConfig(conf: SQLConf) { // The supported customized compression codec is lz4 and fastpfor. val columnarShuffleUseCustomizedCompressionCodec: String = conf.getConfString("spark.oap.sql.columnar.shuffle.customizedCompression.codec", "lz4") + + // a helper flag to check if it's in unit test val isTesting: Boolean = conf.getConfString("spark.oap.sql.columnar.testing", "false").toBoolean + val numaBindingInfo: ColumnarNumaBindingInfo = { val enableNumaBinding: Boolean = conf.getConfString("spark.oap.sql.columnar.numaBinding", "false").toBoolean