Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-46] CI: Add some TPC-H execution rounds for SMJ (#84)
Browse files Browse the repository at this point in the history
Closes #46
  • Loading branch information
zhztheplayer authored Feb 4, 2021
1 parent 1699336 commit 3d0c72c
Show file tree
Hide file tree
Showing 25 changed files with 304 additions and 199 deletions.
22 changes: 17 additions & 5 deletions .github/workflows/tpch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,29 @@ jobs:
git clone https://github.com/oap-project/arrow-data-source.git
cd arrow-data-source
mvn clean install -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
- name: Run Maven tests
- name: Run Maven tests - BHJ
run: |
cd core/
mvn test -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DmembersOnlySuites=com.intel.oap.tpch -DtagsToInclude=com.intel.oap.tags.TestAndWriteLogs -DargLine="-Xmx1G -XX:MaxDirectMemorySize=500M -Dio.netty.allocator.numDirectArena=1"
mvn test -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DmembersOnlySuites=com.intel.oap.tpch -DtagsToInclude=com.intel.oap.tags.BroadcastHashJoinMode -DargLine="-Xmx1G -XX:MaxDirectMemorySize=500M -Dio.netty.allocator.numDirectArena=1"
env:
MALLOC_ARENA_MAX: "4"
MAVEN_OPTS: "-Xmx1G"
COMMENT_TEXT_OUTPUT_PATH: "/tmp/comment_text.txt"
COMMENT_IMAGE_OUTPUT_PATH: "/tmp/comment_image.png"
COMMENT_TEXT_OUTPUT_PATH: "/tmp/comment_text_1.txt"
COMMENT_IMAGE_OUTPUT_PATH: "/tmp/comment_image_1.png"
ENABLE_TPCH_TESTS: "true"
- run: cml-publish /tmp/comment_image.png --md > /tmp/comment.md
- name: Run Maven tests - SMJ
run: |
cd core/
mvn test -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DmembersOnlySuites=com.intel.oap.tpch -DtagsToInclude=com.intel.oap.tags.SortMergeJoinMode -DargLine="-Xmx1G -XX:MaxDirectMemorySize=500M -Dio.netty.allocator.numDirectArena=1"
env:
MALLOC_ARENA_MAX: "4"
MAVEN_OPTS: "-Xmx1G"
COMMENT_TEXT_OUTPUT_PATH: "/tmp/comment_text_2.txt"
COMMENT_IMAGE_OUTPUT_PATH: "/tmp/comment_image_2.png"
ENABLE_TPCH_TESTS: "true"
- run: |
cml-publish /tmp/comment_image_1.png --md > /tmp/comment.md
cml-publish /tmp/comment_image_2.png --md >> /tmp/comment.md
- run: echo "::set-output name=event_path::${GITHUB_EVENT_PATH}"
id: output-envs
- uses: actions/upload-artifact@v2
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ case class RowGuard(child: SparkPlan) extends SparkPlan {
}

case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
val columnarConf = ColumnarPluginConfig.getConf(conf)
val columnarConf = ColumnarPluginConfig.getSessionConf
val preferColumnar = columnarConf.enablePreferColumnar
val optimizeLevel = columnarConf.joinOptimizationThrottle
val enableColumnarShuffle = columnarConf.enableColumnarShuffle
Expand Down
26 changes: 16 additions & 10 deletions core/src/main/scala/com/intel/oap/ColumnarPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf

case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
val columnarConf: ColumnarPluginConfig = ColumnarPluginConfig.getConf(conf)
val columnarConf: ColumnarPluginConfig = ColumnarPluginConfig.getSessionConf
var isSupportAdaptive: Boolean = true
val testing: Boolean = columnarConf.isTesting

Expand Down Expand Up @@ -288,7 +288,7 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
}

case class ColumnarPostOverrides(conf: SparkConf) extends Rule[SparkPlan] {
val columnarConf = ColumnarPluginConfig.getConf(conf)
val columnarConf = ColumnarPluginConfig.getSessionConf
var isSupportAdaptive: Boolean = true

def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match {
Expand Down Expand Up @@ -319,10 +319,14 @@ case class ColumnarOverrideRules(session: SparkSession) extends ColumnarRule wit
def columnarEnabled =
session.sqlContext.getConf("org.apache.spark.example.columnar.enabled", "true").trim.toBoolean
def conf = session.sparkContext.getConf
val rowGuardOverrides = ColumnarGuardRule(conf)
val preOverrides = ColumnarPreOverrides(conf)
val postOverrides = ColumnarPostOverrides(conf)
val collapseOverrides = ColumnarCollapseCodegenStages(conf)

// Do not create rules in class initialization as we should access SQLConf while creating the rules. At this time
// SQLConf may not be there yet.
def rowGuardOverrides = ColumnarGuardRule(conf)
def preOverrides = ColumnarPreOverrides(conf)
def postOverrides = ColumnarPostOverrides(conf)
def collapseOverrides = ColumnarCollapseCodegenStages(conf)

var isSupportAdaptive: Boolean = true

private def supportAdaptive(plan: SparkPlan): Boolean = {
Expand All @@ -344,17 +348,19 @@ case class ColumnarOverrideRules(session: SparkSession) extends ColumnarRule wit
override def preColumnarTransitions: Rule[SparkPlan] = plan => {
if (columnarEnabled) {
isSupportAdaptive = supportAdaptive(plan)
preOverrides.setAdaptiveSupport(isSupportAdaptive)
preOverrides(rowGuardOverrides(plan))
val rule = preOverrides
rule.setAdaptiveSupport(isSupportAdaptive)
rule(rowGuardOverrides(plan))
} else {
plan
}
}

override def postColumnarTransitions: Rule[SparkPlan] = plan => {
if (columnarEnabled) {
postOverrides.setAdaptiveSupport(isSupportAdaptive)
val tmpPlan = postOverrides(plan)
val rule = postOverrides
rule.setAdaptiveSupport(isSupportAdaptive)
val tmpPlan = rule(plan)
collapseOverrides(tmpPlan)
} else {
plan
Expand Down
72 changes: 37 additions & 35 deletions core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,66 +18,67 @@
package com.intel.oap

import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.SQLConf

case class ColumnarNumaBindingInfo(
enableNumaBinding: Boolean,
totalCoreRange: Array[String] = null,
numCoresPerExecutor: Int = -1) {}

class ColumnarPluginConfig(conf: SparkConf) {
class ColumnarPluginConfig(conf: SQLConf) {
val enableColumnarSort: Boolean =
conf.getBoolean("spark.sql.columnar.sort", defaultValue = false)
conf.getConfString("spark.sql.columnar.sort", "false").toBoolean
val enableColumnarNaNCheck: Boolean =
conf.getBoolean("spark.sql.columnar.nanCheck", defaultValue = false)
conf.getConfString("spark.sql.columnar.nanCheck", "false").toBoolean
val enableColumnarBroadcastJoin: Boolean =
conf.getBoolean("spark.sql.columnar.sort.broadcastJoin", defaultValue = true)
conf.getConfString("spark.sql.columnar.sort.broadcastJoin", "true").toBoolean
val enableColumnarWindow: Boolean =
conf.getBoolean("spark.sql.columnar.window", defaultValue = true)
conf.getConfString("spark.sql.columnar.window", "true").toBoolean
val enableColumnarSortMergeJoin: Boolean =
conf.getBoolean("spark.oap.sql.columnar.sortmergejoin", defaultValue = false)
conf.getConfString("spark.oap.sql.columnar.sortmergejoin", "false").toBoolean
val enablePreferColumnar: Boolean =
conf.getBoolean("spark.oap.sql.columnar.preferColumnar", defaultValue = false)
conf.getConfString("spark.oap.sql.columnar.preferColumnar", "false").toBoolean
val enableJoinOptimizationReplace: Boolean =
conf.getBoolean("spark.oap.sql.columnar.joinOptimizationReplace", defaultValue = false)
conf.getConfString("spark.oap.sql.columnar.joinOptimizationReplace", "false").toBoolean
val joinOptimizationThrottle: Integer =
conf.getInt("spark.oap.sql.columnar.joinOptimizationLevel", defaultValue = 6)
conf.getConfString("spark.oap.sql.columnar.joinOptimizationLevel", "6").toInt
val enableColumnarWholeStageCodegen: Boolean =
conf.getBoolean("spark.oap.sql.columnar.wholestagecodegen", defaultValue = true)
conf.getConfString("spark.oap.sql.columnar.wholestagecodegen", "true").toBoolean
val enableColumnarShuffle: Boolean = conf
.get("spark.shuffle.manager", "sort")
.getConfString("spark.shuffle.manager", "sort")
.equals("org.apache.spark.shuffle.sort.ColumnarShuffleManager")
val batchSize: Int =
conf.getInt("spark.sql.execution.arrow.maxRecordsPerBatch", defaultValue = 10000)
conf.getConfString("spark.sql.execution.arrow.maxRecordsPerBatch", "10000").toInt
val enableMetricsTime: Boolean =
conf.getBoolean(
conf.getConfString(
"spark.oap.sql.columnar.wholestagecodegen.breakdownTime",
defaultValue = false)
"false").toBoolean
val tmpFile: String =
conf.getOption("spark.sql.columnar.tmp_dir").getOrElse(null)
conf.getConfString("spark.sql.columnar.tmp_dir", null)
@deprecated val broadcastCacheTimeout: Int =
conf.getInt("spark.sql.columnar.sort.broadcast.cache.timeout", defaultValue = -1)
conf.getConfString("spark.sql.columnar.sort.broadcast.cache.timeout", "-1").toInt
val hashCompare: Boolean =
conf.getBoolean("spark.oap.sql.columnar.hashCompare", defaultValue = false)
conf.getConfString("spark.oap.sql.columnar.hashCompare", "false").toBoolean
// Whether to spill the partition buffers when buffers are full.
// If false, the partition buffers will be cached in memory first,
// and the cached buffers will be spilled when reach maximum memory.
val columnarShufflePreferSpill: Boolean =
conf.getBoolean("spark.oap.sql.columnar.shuffle.preferSpill", defaultValue = true)
conf.getConfString("spark.oap.sql.columnar.shuffle.preferSpill", "true").toBoolean
val columnarShuffleUseCustomizedCompression: Boolean =
conf.getBoolean("spark.oap.sql.columnar.shuffle.customizedCompression", defaultValue = false)
conf.getConfString("spark.oap.sql.columnar.shuffle.customizedCompression", "false").toBoolean
val isTesting: Boolean =
conf.getBoolean("spark.oap.sql.columnar.testing", defaultValue = false)
conf.getConfString("spark.oap.sql.columnar.testing", "false").toBoolean
val numaBindingInfo: ColumnarNumaBindingInfo = {
val enableNumaBinding: Boolean =
conf.getBoolean("spark.oap.sql.columnar.numaBinding", defaultValue = false)
if (enableNumaBinding == false) {
conf.getConfString("spark.oap.sql.columnar.numaBinding", "false").toBoolean
if (!enableNumaBinding) {
ColumnarNumaBindingInfo(false)
} else {
val tmp = conf.getOption("spark.oap.sql.columnar.coreRange").getOrElse(null)
val tmp = conf.getConfString("spark.oap.sql.columnar.coreRange", null)
if (tmp == null) {
ColumnarNumaBindingInfo(false)
} else {
val numCores = conf.getInt("spark.executor.cores", defaultValue = 1)
val numCores = conf.getConfString("spark.executor.cores", "1").toInt
val coreRangeList: Array[String] = tmp.split('|').map(_.trim)
ColumnarNumaBindingInfo(true, coreRangeList, numCores)
}
Expand All @@ -89,21 +90,22 @@ class ColumnarPluginConfig(conf: SparkConf) {
object ColumnarPluginConfig {
var ins: ColumnarPluginConfig = null
var random_temp_dir_path: String = null
def getConf(conf: SparkConf): ColumnarPluginConfig = synchronized {
if (ins == null) {
ins = new ColumnarPluginConfig(conf)
ins
} else {
ins
}
}

/**
* @deprecated We should avoid caching this value in entire JVM. us
*/
@Deprecated
def getConf: ColumnarPluginConfig = synchronized {
if (ins == null) {
throw new IllegalStateException("ColumnarPluginConfig is not initialized yet")
} else {
ins
ins = getSessionConf
}
ins
}

def getSessionConf: ColumnarPluginConfig = {
new ColumnarPluginConfig(SQLConf.get)
}

def getBatchSize: Int = synchronized {
if (ins == null) {
10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ case class ColumnarConditionProjectExec(
with AliasAwareOutputPartitioning
with Logging {

val numaBindingInfo = ColumnarPluginConfig.getConf(sparkContext.getConf).numaBindingInfo
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo

val sparkConf: SparkConf = sparkContext.getConf

Expand Down Expand Up @@ -249,7 +249,7 @@ case class ColumnarConditionProjectExec(
numInputBatches.set(0)

child.executeColumnar().mapPartitions { iter =>
ColumnarPluginConfig.getConf(sparkConf)
ColumnarPluginConfig.getConf
ExecutorManager.tryTaskSet(numaBindingInfo)
val condProj = ColumnarConditionProjector.create(
condition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan)
extends BatchScanExec(output, scan) {
val tmpDir: String = ColumnarPluginConfig.getConf(sparkContext.getConf).tmpFile
val tmpDir: String = ColumnarPluginConfig.getConf.tmpFile
override def supportsColumnar(): Boolean = true
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ case class ColumnarBroadcastHashJoinExec(
with HashJoin {

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf(sparkContext.getConf).numaBindingInfo
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
Expand Down Expand Up @@ -431,7 +431,7 @@ case class ColumnarBroadcastHashJoinExec(

streamedPlan.executeColumnar().mapPartitions { streamIter =>
ExecutorManager.tryTaskSet(numaBindingInfo)
ColumnarPluginConfig.getConf(sparkConf)
ColumnarPluginConfig.getConf
val execTempDir = ColumnarPluginConfig.getTempFile
val jarList = listJars.map(jarUrl => {
logWarning(s"Get Codegened library Jar ${jarUrl}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ColumnarDataSourceRDD(
inputSize: SQLMetric,
tmp_dir: String)
extends RDD[ColumnarBatch](sc, Nil) {
val numaBindingInfo = ColumnarPluginConfig.getConf(sc.getConf).numaBindingInfo
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo

override protected def getPartitions: Array[Partition] = {
inputPartitions.zipWithIndex.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ case class ColumnarHashAggregateExec(
with AliasAwareOutputPartitioning {

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf(sparkContext.getConf).numaBindingInfo
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
override def supportsColumnar = true

// Members declared in org.apache.spark.sql.execution.AliasAwareOutputPartitioning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ case class ColumnarShuffledHashJoinExec(
with HashJoin {

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf(sparkContext.getConf).numaBindingInfo
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
Expand Down Expand Up @@ -237,7 +237,7 @@ case class ColumnarShuffledHashJoinExec(
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
// we will use previous codegen join to handle joins with condition
if (condition.isDefined) {
val enableHashCollisionCheck = ColumnarPluginConfig.getConf(sparkConf).hashCompare
val enableHashCollisionCheck = ColumnarPluginConfig.getConf.hashCompare
val hashTableType = if (enableHashCollisionCheck) 1 else 0
return getCodeGenIterator(hashTableType)
}
Expand All @@ -256,7 +256,7 @@ case class ColumnarShuffledHashJoinExec(
ExecutorManager.tryTaskSet(numaBindingInfo)
val hashRelationKernel = new ExpressionEvaluator()
val hashRelationBatchHolder: ListBuffer[ColumnarBatch] = ListBuffer()
val enableHashCollisionCheck = ColumnarPluginConfig.getConf(sparkConf).hashCompare
val enableHashCollisionCheck = ColumnarPluginConfig.getConf.hashCompare
val hashTableType = if (enableHashCollisionCheck) 1 else 0
val hash_relation_function =
ColumnarConditionedProbeJoin
Expand Down Expand Up @@ -455,7 +455,7 @@ case class ColumnarShuffledHashJoinExec(
streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) =>
ExecutorManager.tryTaskSet(numaBindingInfo)
ColumnarPluginConfig.getConf(sparkConf)
ColumnarPluginConfig.getConf
val execTempDir = ColumnarPluginConfig.getTempFile
val jarList = listJars.map(jarUrl => {
logWarning(s"Get Codegened library Jar ${jarUrl}")
Expand Down Expand Up @@ -555,7 +555,7 @@ case class ColumnarShuffledHashJoinExec(
streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) =>
ExecutorManager.tryTaskSet(numaBindingInfo)
ColumnarPluginConfig.getConf(sparkConf)
ColumnarPluginConfig.getConf
val execTempDir = ColumnarPluginConfig.getTempFile
val jarList = listJars.map(jarUrl => {
logWarning(s"Get Codegened library Jar ${jarUrl}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ case class ColumnarSortExec(
with ColumnarCodegenSupport {

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf(sparkContext.getConf).numaBindingInfo
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
override def supportsColumnar = true
override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(s"ColumnarSortExec doesn't support doExecute")
Expand Down Expand Up @@ -194,7 +194,7 @@ case class ColumnarSortExec(
val res = if (!hasInput) {
Iterator.empty
} else {
ColumnarPluginConfig.getConf(sparkConf)
ColumnarPluginConfig.getConf
val execTempDir = ColumnarPluginConfig.getTempFile
val jarList = listJars.map(jarUrl => {
logWarning(s"Get Codegened library Jar ${jarUrl}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ case class ColumnarSortMergeJoinExec(
val signature = getCodeGenSignature
val listJars = uploadAndListJars(signature)
right.executeColumnar().zipPartitions(left.executeColumnar()) { (streamIter, buildIter) =>
ColumnarPluginConfig.getConf(sparkConf)
ColumnarPluginConfig.getConf
val execTempDir = ColumnarPluginConfig.getTempFile
val jarList = listJars.map(jarUrl => {
logWarning(s"Get Codegened library Jar ${jarUrl}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
with ColumnarCodegenSupport {

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf(sparkConf).numaBindingInfo
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down Expand Up @@ -281,7 +281,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
val buildPlan = p.getBuildPlan
val buildInputByteBuf = buildPlan.executeBroadcast[ColumnarHashedRelation]()
curRDD.mapPartitions { iter =>
ColumnarPluginConfig.getConf(sparkConf)
ColumnarPluginConfig.getConf
ExecutorManager.tryTaskSet(numaBindingInfo)
// received broadcast value contain a hashmap and raw recordBatch
val beforeFetch = System.nanoTime()
Expand Down Expand Up @@ -413,7 +413,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I

curRDD.mapPartitions { iter =>
ExecutorManager.tryTaskSet(numaBindingInfo)
ColumnarPluginConfig.getConf(sparkConf)
ColumnarPluginConfig.getConf
val execTempDir = ColumnarPluginConfig.getTempFile
val jarList = listJars.map(jarUrl => {
logWarning(s"Get Codegened library Jar ${jarUrl}")
Expand Down
Loading

0 comments on commit 3d0c72c

Please sign in to comment.