diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala index 05f00329f..5e9601318 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -226,6 +226,9 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { conf .getConfString("spark.oap.sql.columnar.shuffleSplitDefaultSize", "8192").toInt + val maxWholeStageCodegenColumnNum: Int = + conf.getConfString("spark.oap.sql.columnar.codegen.maxColumnNum", "30").toInt + val numaBindingInfo: GazelleNumaBindingInfo = { val enableNumaBinding: Boolean = conf.getConfString("spark.oap.sql.columnar.numaBinding", "false").toBoolean diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala index d72bb0800..25f74b44f 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala @@ -186,7 +186,7 @@ case class ColumnarConditionProjectExec( } } } - true + super.supportColumnarCodegen } def getKernelFunction(childTreeNode: TreeNode): TreeNode = { diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala index 3089986d8..e57aff5f1 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala @@ -295,7 +295,7 @@ case class ColumnarBroadcastHashJoinExec( override def getChild: SparkPlan = streamedPlan override def supportColumnarCodegen: Boolean = { - this.supportCodegen + this.supportCodegen && super.supportColumnarCodegen } val output_skip_alias = diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala index abba813f6..5dd6e3987 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala @@ -729,7 +729,7 @@ case class ColumnarHashAggregateExec( } } } - return true + super.supportColumnarCodegen } def getKernelFunction: TreeNode = { diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala index 795d40a56..051a9745a 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala @@ -287,7 +287,7 @@ case class ColumnarShuffledHashJoinExec( } override def supportColumnarCodegen: Boolean = { - this.supportCodegen + this.supportCodegen && super.supportColumnarCodegen } val output_skip_alias = diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala index 65fa729c3..af376ceb2 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala @@ -139,7 +139,7 @@ case class ColumnarSortExec( Seq(child.executeColumnar()) } - override def supportColumnarCodegen: Boolean = true + override def supportColumnarCodegen: Boolean = super.supportColumnarCodegen override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = child match { case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true => diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala index 74115a621..5ce4d18cf 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala @@ -269,7 +269,7 @@ case class ColumnarSortMergeJoinExec( return false } } - true + super.supportColumnarCodegen } val output_skip_alias = diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala index 59d32ef20..85fa89932 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala @@ -46,7 +46,10 @@ trait ColumnarCodegenSupport extends SparkPlan { /** * Whether this SparkPlan supports whole stage codegen or not. */ - def supportColumnarCodegen: Boolean = true + def supportColumnarCodegen: Boolean = { + // TODO: support large num columns in WSCG + output.size <= GazellePluginConfig.getSessionConf.maxWholeStageCodegenColumnNum + } /** * Returns all the RDDs of ColumnarBatch which generates the input rows. @@ -91,7 +94,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I // This is not strictly needed because the codegen transformation happens after the columnar // transformation but just for consistency - override def supportColumnarCodegen: Boolean = true + override def supportColumnarCodegen: Boolean = super.supportColumnarCodegen override def supportsColumnar: Boolean = true