Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-273] support spark311 (#272)
Browse files Browse the repository at this point in the history
* support spark 3.0.2

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* update to use spark 302 in unit tests

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* support spark 311

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix missing dep

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix broadcastexchange metrics

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix arrow data source

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix sum with decimal

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix c++ code

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* adding partial sum decimal sum

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix hashagg in wscg

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix partial sum with number type

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix AQE shuffle copy

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix shuffle redudant reat

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix rebase

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix format

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* avoid unecessary fallbacks

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* on-demand scala unit tests

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* clean up

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
zhouyuan authored May 11, 2021
1 parent 2f5a532 commit 975fcaa
Show file tree
Hide file tree
Showing 33 changed files with 521 additions and 992 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
ctest -R
scala-unit-test:
if: ${{ github.event.issue.pull_request && startsWith(github.event.comment.body, '@github-actions scala-unit-test') }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand All @@ -82,8 +83,8 @@ jobs:
- name: Install Spark
run: |
cd /tmp
wget http://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
tar -xf spark-3.0.0-bin-hadoop2.7.tgz
wget http://archive.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
tar -xf spark-3.0.2-bin-hadoop2.7.tgz
- name: Install OAP optimized Arrow (C++ libs)
run: |
cd /tmp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,11 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
writer.setFloats(rowId, count, src, srcIndex);
}

@Override
public void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {

}

@Override
public float getFloat(int rowId) {
return accessor.getFloat(rowId);
Expand Down Expand Up @@ -710,6 +715,11 @@ public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
writer.setDoubles(rowId, count, src, srcIndex);
}

@Override
public void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex) {

}

@Override
public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
writer.setDoubles(rowId, count, src, srcIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ class ParquetFileFormat
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
"",
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
val iter = new RecordReaderIterator(vectorizedReader)
Expand All @@ -358,7 +359,7 @@ class ParquetFileFormat
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz, enableVectorizedReader = false, datetimeRebaseMode)
convertTz, enableVectorizedReader = false, datetimeRebaseMode, SQLConf.LegacyBehaviorPolicy.LEGACY)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
Expand Down Expand Up @@ -450,7 +451,7 @@ object ParquetFileFormat extends Logging {
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
}

SchemaMergeUtils.mergeSchemasInParallel(sparkSession, filesToTouch, reader)
SchemaMergeUtils.mergeSchemasInParallel(sparkSession, null, filesToTouch, reader)
}

private[parquet] def readParquetFootersInParallel(
Expand Down
10 changes: 10 additions & 0 deletions arrow-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@
</pluginRepositories>

<dependencies>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ArrowDataSourceV2 extends FileDataSourceV2 {

override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
ArrowTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}

Expand Down
6 changes: 0 additions & 6 deletions native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,6 @@
<groupId>com.intel.oap</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>spark-arrow-datasource-standard</artifactId>
<groupId>com.intel.oap</groupId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.flatbuffers</groupId>
<artifactId>flatbuffers-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class VectorizedParquetArrowReader extends VectorizedParquetRecordReader

public VectorizedParquetArrowReader(String path, ZoneId convertTz, boolean useOffHeap,
int capacity, StructType sourceSchema, StructType readDataSchema, String tmp_dir) {
super(convertTz, "", useOffHeap, capacity);
super(convertTz, "CORRECTED", "LEGACY", useOffHeap, capacity);
this.capacity = capacity;
this.path = path;
this.tmp_dir = tmp_dir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ case class RowGuard(child: SparkPlan) extends SparkPlan {
def children: Seq[SparkPlan] = Seq(child)
}

case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
case class ColumnarGuardRule() extends Rule[SparkPlan] {
val columnarConf = ColumnarPluginConfig.getSessionConf
val preferColumnar = columnarConf.enablePreferColumnar
val optimizeLevel = columnarConf.joinOptimizationThrottle
Expand Down Expand Up @@ -103,8 +103,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
if (!enableColumnarShuffle) return false
new ColumnarShuffleExchangeExec(
plan.outputPartitioning,
plan.child,
plan.canChangeNumPartitions)
plan.child)
case plan: ShuffledHashJoinExec =>
if (!enableColumnarShuffledHashJoin) return false
ColumnarShuffledHashJoinExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.intel.oap

import com.intel.oap.execution._
import org.apache.spark.SparkConf
import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf

case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
case class ColumnarPreOverrides() extends Rule[SparkPlan] {
val columnarConf: ColumnarPluginConfig = ColumnarPluginConfig.getSessionConf
var isSupportAdaptive: Boolean = true

Expand Down Expand Up @@ -116,14 +116,12 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
if (isSupportAdaptive) {
new ColumnarShuffleExchangeAdaptor(
plan.outputPartitioning,
child,
plan.canChangeNumPartitions)
child)
} else {
CoalesceBatchesExec(
ColumnarShuffleExchangeExec(
plan.outputPartitioning,
child,
plan.canChangeNumPartitions))
child))
}
} else {
plan.withNewChildren(Seq(child))
Expand Down Expand Up @@ -199,20 +197,19 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
case shuffle: ColumnarShuffleExchangeAdaptor =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
CoalesceBatchesExec(
ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs, plan.description))
ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs))
case ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeAdaptor) =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
CoalesceBatchesExec(
ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs, plan.description))
ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs))
case ShuffleQueryStageExec(_, reused: ReusedExchangeExec) =>
reused match {
case ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeAdaptor) =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
CoalesceBatchesExec(
ColumnarCustomShuffleReaderExec(
plan.child,
plan.partitionSpecs,
plan.description))
plan.partitionSpecs))
case _ =>
plan
}
Expand Down Expand Up @@ -277,7 +274,7 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {

}

case class ColumnarPostOverrides(conf: SparkConf) extends Rule[SparkPlan] {
case class ColumnarPostOverrides() extends Rule[SparkPlan] {
val columnarConf = ColumnarPluginConfig.getSessionConf
var isSupportAdaptive: Boolean = true

Expand Down Expand Up @@ -324,10 +321,12 @@ case class ColumnarOverrideRules(session: SparkSession) extends ColumnarRule wit

// Do not create rules in class initialization as we should access SQLConf while creating the rules. At this time
// SQLConf may not be there yet.
def rowGuardOverrides = ColumnarGuardRule(conf)
def preOverrides = ColumnarPreOverrides(conf)
def postOverrides = ColumnarPostOverrides(conf)
def collapseOverrides = ColumnarCollapseCodegenStages(conf)
def rowGuardOverrides = ColumnarGuardRule()
def preOverrides = ColumnarPreOverrides()
def postOverrides = ColumnarPostOverrides()

val columnarWholeStageEnabled = conf.getBoolean("spark.oap.sql.columnar.wholestagecodegen", defaultValue = true)
def collapseOverrides = ColumnarCollapseCodegenStages(columnarWholeStageEnabled)

var isSupportAdaptive: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging {
// for all perf turnings
// prefer to use columnar operators if set to true
val enablePreferColumnar: Boolean =
conf.getConfString("spark.oap.sql.columnar.preferColumnar", "false").toBoolean
conf.getConfString("spark.oap.sql.columnar.preferColumnar", "true").toBoolean

// fallback to row operators if there are several continous joins
val joinOptimizationThrottle: Integer =
conf.getConfString("spark.oap.sql.columnar.joinOptimizationLevel", "6").toInt
conf.getConfString("spark.oap.sql.columnar.joinOptimizationLevel", "12").toInt

val batchSize: Int =
conf.getConfString("spark.sql.execution.arrow.maxRecordsPerBatch", "10000").toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashJoin}
import org.apache.spark.sql.execution.joins.{HashJoin,ShuffledJoin,BaseJoinExec}
import org.apache.spark.sql.execution.joins.HashedRelationInfo
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
Expand All @@ -51,9 +54,9 @@ case class ColumnarBroadcastHashJoinExec(
left: SparkPlan,
right: SparkPlan,
projectList: Seq[NamedExpression] = null)
extends BinaryExecNode
extends BaseJoinExec
with ColumnarCodegenSupport
with HashJoin {
with ShuffledJoin {

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
Expand All @@ -65,6 +68,11 @@ case class ColumnarBroadcastHashJoinExec(
"joinTime" -> SQLMetrics.createTimingMetric(sparkContext, "join time"),
"fetchTime" -> SQLMetrics.createTimingMetric(sparkContext, "broadcast result fetch time"))

protected lazy val (buildPlan, streamedPlan) = buildSide match {
case BuildLeft => (left, right)
case BuildRight => (right, left)
}

val (buildKeyExprs, streamedKeyExprs) = {
require(
leftKeys.map(_.dataType) == rightKeys.map(_.dataType),
Expand Down Expand Up @@ -129,12 +137,14 @@ case class ColumnarBroadcastHashJoinExec(
throw new UnsupportedOperationException(
s"ColumnarBroadcastHashJoinExec doesn't support doExecute")
}

override def inputRDDs(): Seq[RDD[ColumnarBatch]] = streamedPlan match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.inputRDDs
case _ =>
Seq(streamedPlan.executeColumnar())
}

override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = streamedPlan match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
val childPlans = c.getBuildPlans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ case class ColumnarHashAggregateExec(
return new ColumnarBatch(resultColumnVectors.map(_.asInstanceOf[ColumnVector]), 0)
}
val outputNumRows = output_rb.getLength

val output = ConverterUtils.fromArrowRecordBatch(hash_aggr_out_schema, output_rb)
ConverterUtils.releaseArrowRecordBatch(output_rb)
eval_elapse += System.nanoTime() - beforeEval
Expand Down Expand Up @@ -289,7 +290,7 @@ case class ColumnarHashAggregateExec(
var idx = 0
for (expr <- aggregateExpressions) {
expr.aggregateFunction match {
case Average(_) | StddevSamp(_) | Sum(_) | Max(_) | Min(_) =>
case Average(_) | StddevSamp(_, _) | Sum(_) | Max(_) | Min(_) =>
expr.mode match {
case Final =>
resultColumnVectors(idx).putNull(0)
Expand Down Expand Up @@ -387,7 +388,7 @@ case class ColumnarHashAggregateExec(
val aggregateFunction = expr.aggregateFunction
aggregateFunction match {
case Average(_) | Sum(_) | Count(_) | Max(_) | Min(_) =>
case StddevSamp(_) =>
case StddevSamp(_, _) =>
mode match {
case Partial | Final =>
case other =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ import com.intel.oap.expression._
import com.intel.oap.vectorized.ExpressionEvaluator
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashJoin}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.execution.joins.{HashJoin,ShuffledJoin,BaseJoinExec}
import org.apache.spark.sql.execution.joins.HashedRelationInfo

/**
* Performs a hash join of two child relations by first shuffling the data using the join keys.
Expand All @@ -65,9 +67,9 @@ case class ColumnarShuffledHashJoinExec(
left: SparkPlan,
right: SparkPlan,
projectList: Seq[NamedExpression] = null)
extends BinaryExecNode
extends BaseJoinExec
with ColumnarCodegenSupport
with HashJoin {
with ShuffledJoin {

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
Expand All @@ -80,6 +82,11 @@ case class ColumnarShuffledHashJoinExec(

buildCheck()

protected lazy val (buildPlan, streamedPlan) = buildSide match {
case BuildLeft => (left, right)
case BuildRight => (right, left)
}

val (buildKeyExprs, streamedKeyExprs) = {
require(
leftKeys.map(_.dataType) == rightKeys.map(_.dataType),
Expand Down Expand Up @@ -150,10 +157,6 @@ case class ColumnarShuffledHashJoinExec(
if (projectList == null || projectList.isEmpty) super.output
else projectList.map(_.toAttribute)

/*protected lazy val (buildPlan, streamedPlan, buildKeys, streamKeys) = buildSide match {
case BuildLeft => (left, right, leftKeys, rightKeys)
case BuildRight => (right, left, rightKeys, leftKeys)
}*/

def getBuildPlan: SparkPlan = buildPlan
override def updateMetrics(out_num_rows: Long, process_time: Long): Unit = {
Expand All @@ -168,12 +171,14 @@ case class ColumnarShuffledHashJoinExec(
s"ColumnarShuffledHashJoinExec doesn't support doExecute")
}
override def supportsColumnar = true

override def inputRDDs(): Seq[RDD[ColumnarBatch]] = streamedPlan match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.inputRDDs
case _ =>
Seq(streamedPlan.executeColumnar())
}

override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = streamedPlan match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
val childPlans = c.getBuildPlans
Expand Down
Loading

0 comments on commit 975fcaa

Please sign in to comment.