From dfe096c5a399da594c9df7f6476da909372055de Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 6 Apr 2022 19:04:43 +0800 Subject: [PATCH 1/5] Add native CoalesceBatches implementation --- .../ArrowCoalesceBatchesJniWrapper.java | 32 +++ .../com/intel/oap/GazellePluginConfig.scala | 3 + .../execution/ArrowCoalesceBatchesExec.scala | 179 +++++++++++++++ .../oap/extension/ColumnarOverrides.scala | 53 ++++- .../shuffle/ArrowCoalesceBatchesSuite.scala | 214 ++++++++++++++++++ native-sql-engine/cpp/src/jni/jni_wrapper.cc | 85 +++++++ 6 files changed, 554 insertions(+), 12 deletions(-) create mode 100644 native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowCoalesceBatchesJniWrapper.java create mode 100644 native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowCoalesceBatchesExec.scala create mode 100644 native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ArrowCoalesceBatchesSuite.scala diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowCoalesceBatchesJniWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowCoalesceBatchesJniWrapper.java new file mode 100644 index 000000000..0afab95ad --- /dev/null +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowCoalesceBatchesJniWrapper.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.vectorized; + +import java.io.IOException; + +public class ArrowCoalesceBatchesJniWrapper { + + public ArrowCoalesceBatchesJniWrapper() throws IOException { + JniUtils.getInstance(); + } + + public native byte[] nativeCoalesceBatches( + byte[] schema, int totalnumRows, int[] numRows, long[][] bufAddrs, + long[][] bufSizes, long memoryPollID) throws RuntimeException; + +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala index a2bce89fc..fbdab2758 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -83,6 +83,9 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { val enableArrowRowToColumnar: Boolean = conf.getConfString("spark.oap.sql.columnar.rowtocolumnar", "true").toBoolean && enableCpu + val enableArrowCoalesceBatches: Boolean = + conf.getConfString("spark.oap.sql.columnar.arrowcoalescebatches", "true").toBoolean && enableCpu + val forceShuffledHashJoin: Boolean = conf.getConfString("spark.oap.sql.columnar.forceshuffledhashjoin", "false").toBoolean && enableCpu diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowCoalesceBatchesExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowCoalesceBatchesExec.scala new file mode 100644 index 000000000..dccc4a54b --- /dev/null +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowCoalesceBatchesExec.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.execution + +import com.intel.oap.expression.ConverterUtils +import com.intel.oap.vectorized.{ArrowCoalesceBatchesJniWrapper, ArrowWritableColumnVector, CloseableColumnBatchIterator} +import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer +import org.apache.arrow.vector.types.pojo.Schema +import org.apache.arrow.vector.util.VectorBatchAppender +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.util.ArrowUtils +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, ListBuffer} + +case class ArrowCoalesceBatchesExec(child: SparkPlan) extends UnaryExecNode { + + override def output: Seq[Attribute] = child.output + + override def supportsColumnar: Boolean = true + + override def nodeName: String = "ArrowCoalesceBatches" + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"), + "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"), + "collectTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_collectbatch"), + "concatTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_coalescebatch"), + "avgCoalescedNumRows" -> SQLMetrics + .createAverageMetric(sparkContext, "avg coalesced batch num rows")) + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + import ArrowCoalesceBatchesExec._ + + val recordsPerBatch = conf.arrowMaxRecordsPerBatch + val numOutputRows = longMetric("numOutputRows") + val numInputBatches = longMetric("numInputBatches") + val numOutputBatches = longMetric("numOutputBatches") + val collectTime = longMetric("collectTime") + val concatTime = longMetric("concatTime") + val avgCoalescedNumRows = longMetric("avgCoalescedNumRows") + + child.executeColumnar().mapPartitions { iter => + val jniWrapper = new ArrowCoalesceBatchesJniWrapper() + val beforeInput = System.nanoTime + val hasInput = iter.hasNext + collectTime += System.nanoTime - beforeInput + val res = if (hasInput) { + new Iterator[ColumnarBatch] { + var numBatchesTotal: Long = _ + var numRowsTotal: Long = _ + SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit] { _ => + if (numBatchesTotal > 0) { + avgCoalescedNumRows.set(numRowsTotal.toDouble / numBatchesTotal) + } + } + + override def hasNext: Boolean = { + val beforeNext = System.nanoTime + val hasNext = iter.hasNext + collectTime += System.nanoTime - beforeNext + hasNext + } + + override def next(): ColumnarBatch = { + + if (!hasNext) { + throw new NoSuchElementException("End of ColumnarBatch iterator") + } + + var rowCount = 0 + val batchesToAppend = ListBuffer[ColumnarBatch]() + + val arrBufAddrs = new ArrayBuffer[Array[Long]]() + val arrBufSizes = new ArrayBuffer[Array[Long]]() + val numrows = ListBuffer[Int]() + + val beforeConcat = System.nanoTime + while (hasNext && rowCount < recordsPerBatch) { + val delta: ColumnarBatch = iter.next() + delta.retain() + rowCount += delta.numRows + batchesToAppend += delta + + val bufAddrs = new ListBuffer[Long]() + val bufSizes = new ListBuffer[Long]() + val recordBatch = ConverterUtils.createArrowRecordBatch(delta) + recordBatch.getBuffers().asScala.foreach { buffer => bufAddrs += buffer.memoryAddress() } + recordBatch.getBuffersLayout().asScala.foreach { bufLayout => + bufSizes += bufLayout.getSize() + } + ConverterUtils.releaseArrowRecordBatch(recordBatch) + arrBufAddrs.append(bufAddrs.toArray) + arrBufSizes.append(bufSizes.toArray) + numrows.append(delta.numRows) + } + + // chendi: We need make sure target FieldTypes are exactly the same as src + val expected_output_arrow_fields = if (batchesToAppend.size > 0) { + (0 until batchesToAppend(0).numCols).map(i => { + batchesToAppend(0).column(i).asInstanceOf[ArrowWritableColumnVector].getValueVector.getField + }) + } else { + Nil + } + + val schema = new Schema(expected_output_arrow_fields.asJava) + val arrowSchema = ConverterUtils.getSchemaBytesBuf(schema) + + val serializedRecordBatch = jniWrapper.nativeCoalesceBatches( + arrowSchema, rowCount, numrows.toArray, arrBufAddrs.toArray, arrBufSizes.toArray, + SparkMemoryUtils.contextMemoryPool().getNativeInstanceId) + val rb = UnsafeRecordBatchSerializer.deserializeUnsafe(SparkMemoryUtils.contextAllocator(), serializedRecordBatch) + val ColVecArr = ConverterUtils.fromArrowRecordBatch(schema, rb) + val outputNumRows = rb.getLength + ConverterUtils.releaseArrowRecordBatch(rb) + val bigColBatch = new ColumnarBatch(ColVecArr.map(v => v.asInstanceOf[ColumnVector]).toArray, rowCount) + + concatTime += System.nanoTime - beforeConcat + numOutputRows += rowCount + numInputBatches += batchesToAppend.length + numOutputBatches += 1 + + // used for calculating avgCoalescedNumRows + numRowsTotal += rowCount + numBatchesTotal += 1 + + batchesToAppend.foreach(cb => cb.close()) + + bigColBatch + } + } + } else { + Iterator.empty + } + new CloseableColumnBatchIterator(res) + } + } + + // For spark 3.2. + protected def withNewChildInternal(newChild: SparkPlan): ArrowCoalesceBatchesExec = + copy(child = newChild) +} + +object ArrowCoalesceBatchesExec { + implicit class ArrowColumnarBatchRetainer(val cb: ColumnarBatch) { + def retain(): Unit = { + (0 until cb.numCols).toList.foreach(i => + cb.column(i).asInstanceOf[ArrowWritableColumnVector].retain()) + } + } +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 7aab341a0..2f3774690 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -148,6 +148,8 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { child match { case p: CoalesceBatchesExec => ColumnarSortExec(plan.sortOrder, plan.global, p.child, plan.testSpillFrequency) + case p: ArrowCoalesceBatchesExec => + ColumnarSortExec(plan.sortOrder, plan.global, p.child, plan.testSpillFrequency) case _ => ColumnarSortExec(plan.sortOrder, plan.global, child, plan.testSpillFrequency) } @@ -160,10 +162,17 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { plan.outputPartitioning, child) } else { - CoalesceBatchesExec( - ColumnarShuffleExchangeExec( - plan.outputPartitioning, - child)) + if (columnarConf.enableArrowCoalesceBatches) { + ArrowCoalesceBatchesExec( + ColumnarShuffleExchangeExec( + plan.outputPartitioning, + child)) + } else { + CoalesceBatchesExec( + ColumnarShuffleExchangeExec( + plan.outputPartitioning, + child)) + } } } else { plan.withNewChildren(Seq(child)) @@ -261,21 +270,39 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { child match { case shuffle: ColumnarShuffleExchangeAdaptor => logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - CoalesceBatchesExec( - ColumnarCustomShuffleReaderExec(child, partitionSpecs)) + if (columnarConf.enableArrowCoalesceBatches) { + ArrowCoalesceBatchesExec( + ColumnarCustomShuffleReaderExec(child, partitionSpecs)) + } else { + CoalesceBatchesExec( + ColumnarCustomShuffleReaderExec(child, partitionSpecs)) + } + // Use the below code to replace the above to realize compatibility on spark 3.1 & 3.2. case shuffleQueryStageExec: ShuffleQueryStageExec => shuffleQueryStageExec.plan match { case s: ColumnarShuffleExchangeAdaptor => logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - CoalesceBatchesExec( - ColumnarCustomShuffleReaderExec(child, partitionSpecs)) + if (columnarConf.enableArrowCoalesceBatches) { + ArrowCoalesceBatchesExec( + ColumnarCustomShuffleReaderExec(child, partitionSpecs)) + } else { + CoalesceBatchesExec( + ColumnarCustomShuffleReaderExec(child, partitionSpecs)) + } case r @ ReusedExchangeExec(_, s: ColumnarShuffleExchangeAdaptor) => logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - CoalesceBatchesExec( - ColumnarCustomShuffleReaderExec( - child, - partitionSpecs)) + if (columnarConf.enableArrowCoalesceBatches) { + ArrowCoalesceBatchesExec( + ColumnarCustomShuffleReaderExec( + child, + partitionSpecs)) + } else { + CoalesceBatchesExec( + ColumnarCustomShuffleReaderExec( + child, + partitionSpecs)) + } case _ => plan } @@ -391,6 +418,8 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { replaceWithColumnarPlan(child) case ColumnarToRowExec(child: CoalesceBatchesExec) => plan.withNewChildren(Seq(replaceWithColumnarPlan(child.child))) + case ColumnarToRowExec(child: ArrowCoalesceBatchesExec) => + plan.withNewChildren(Seq(replaceWithColumnarPlan(child.child))) case plan: ColumnarToRowExec => if (columnarConf.enableArrowColumnarToRow) { val child = replaceWithColumnarPlan(plan.child) diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ArrowCoalesceBatchesSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ArrowCoalesceBatchesSuite.scala new file mode 100644 index 000000000..d21e73d4b --- /dev/null +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ArrowCoalesceBatchesSuite.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.nio.file.Files + +import com.intel.oap.execution.{ArrowCoalesceBatchesExec} +import com.intel.oap.tpc.util.TPCRunner +import org.apache.log4j.{Level, LogManager} +import org.apache.spark.SparkConf +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.functions.{col, expr} +import org.apache.spark.sql.test.SharedSparkSession + +class ArrowCoalesceBatchesSuite extends QueryTest with SharedSparkSession { + + private val MAX_DIRECT_MEMORY = "5000m" + private var runner: TPCRunner = _ + + private var lPath: String = _ + private var rPath: String = _ + private val scale = 100 + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set("spark.memory.offHeap.size", String.valueOf(MAX_DIRECT_MEMORY)) + .set("spark.plugins", "com.intel.oap.GazellePlugin") + .set("spark.sql.codegen.wholeStage", "false") + .set("spark.sql.sources.useV1SourceList", "") + .set("spark.oap.sql.columnar.tmp_dir", "/tmp/") + .set("spark.sql.columnar.sort.broadcastJoin", "true") + .set("spark.storage.blockManagerSlaveTimeoutMs", "3600000") + .set("spark.executor.heartbeatInterval", "3600000") + .set("spark.network.timeout", "3601s") + .set("spark.oap.sql.columnar.preferColumnar", "true") + .set("spark.oap.sql.columnar.sortmergejoin", "true") + .set("spark.sql.columnar.codegen.hashAggregate", "false") + .set("spark.sql.columnar.sort", "true") + .set("spark.sql.columnar.window", "true") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.unsafe.exceptionOnMemoryLeak", "false") + .set("spark.network.io.preferDirectBufs", "false") + .set("spark.sql.sources.useV1SourceList", "arrow,parquet") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.oap.sql.columnar.sortmergejoin.lazyread", "true") + .set("spark.oap.sql.columnar.autorelease", "false") + .set("spark.sql.shuffle.partitions", "50") + .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "5") + .set("spark.oap.sql.columnar.shuffledhashjoin.buildsizelimit", "200m") +// .set("spark.sql.join.preferSortMergeJoin", "false") +// .set("spark.sql.shuffle.partitions", "1") + .set("spark.oap.sql.columnar.wholestagecodegen", "false") + .set("spark.oap.sql.columnar.forceshuffledhashjoin", "true") + return conf + } + + override def beforeAll(): Unit = { + super.beforeAll() + LogManager.getRootLogger.setLevel(Level.WARN) + + val lfile = Files.createTempFile("", ".parquet").toFile + lfile.deleteOnExit() + lPath = lfile.getAbsolutePath + spark.range(2).select(col("id"), expr("1").as("kind"), + expr("array(1, 2)").as("arr_field"), + expr("array(\"hello\", \"world\")").as("arr_str_field"), + expr("array(array(1, 2), array(3, 4))").as("arr_arr_field"), + expr("array(struct(1, 2), struct(1, 2))").as("arr_struct_field"), + expr("array(map(1, 2), map(3,4))").as("arr_map_field"), + expr("struct(1, 2)").as("struct_field"), + expr("struct(1, struct(1, 2))").as("struct_struct_field"), + expr("struct(1, array(1, 2))").as("struct_array_field"), + expr("map(1, 2)").as("map_field"), + expr("map(1, map(3,4))").as("map_map_field"), + expr("map(1, array(1, 2))").as("map_arr_field"), + expr("map(struct(1, 2), 2)").as("map_struct_field")) + .coalesce(1) + .write + .format("parquet") + .mode("overwrite") + .parquet(lPath) + + val rfile = Files.createTempFile("", ".parquet").toFile + rfile.deleteOnExit() + rPath = rfile.getAbsolutePath + spark.range(2).select(col("id"), expr("id % 2").as("kind"), + expr("array(1, 2)").as("arr_field"), + expr("struct(1, 2)").as("struct_field")) + .coalesce(1) + .write + .format("parquet") + .mode("overwrite") + .parquet(rPath) + + spark.catalog.createTable("ltab", lPath, "arrow") + spark.catalog.createTable("rtab", rPath, "arrow") + } + + test("Test Array in CoalesceBatches") { + val df = spark.sql("SELECT ltab.arr_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count == 2) + } + + test("Test Nest Array in CoalesceBatches") { + val df = spark.sql("SELECT ltab.arr_arr_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count == 2) + } + + test("Test Array_Struct in CoalesceBatches") { + val df = spark.sql("SELECT ltab.arr_struct_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count == 2) + } + + test("Test Array_Map in CoalesceBatches") { + val df = spark.sql("SELECT ltab.arr_map_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count == 2) + } + + test("Test Struct in CoalesceBatches") { + val df = spark.sql("SELECT ltab.struct_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Nest Struct in CoalesceBatches") { + val df = spark.sql("SELECT ltab.struct_struct_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Struct_Array in CoalesceBatches") { + val df = spark.sql("SELECT ltab.struct_array_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Map in CoalesceBatches") { + val df = spark.sql("SELECT ltab.map_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Nest Map in CoalesceBatches") { + val df = spark.sql("SELECT ltab.map_map_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Map_Array in CoalesceBatches") { + val df = spark.sql("SELECT ltab.map_arr_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Map_Struct in CoalesceBatches") { + val df = spark.sql("SELECT ltab.map_struct_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Array String in CoalesceBatches") { + val df = spark.sql("SELECT ltab.arr_str_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.printSchema() + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ArrowCoalesceBatchesExec]).isDefined) + assert(df.count == 2) + } + + override def afterAll(): Unit = { + super.afterAll() + } +} diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index 5e9ddad39..db10c486b 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -45,6 +45,11 @@ #include "proto/protobuf_utils.h" #include "shuffle/splitter.h" #include "utils/exception.h" +#include "arrow/array/concatenate.h" + +#include "arrow/type_fwd.h" +#include "arrow/result.h" +#include "arrow/array.h" namespace { @@ -1396,6 +1401,86 @@ Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeClose( JNI_METHOD_END() } +JNIEXPORT jobject JNICALL +Java_com_intel_oap_vectorized_ArrowCoalesceBatchesJniWrapper_nativeCoalesceBatches( + JNIEnv* env, jobject, jbyteArray schema_arr, jint total_num_rows, jintArray num_rows_arr, + jobjectArray buf_addrs, jobjectArray buf_sizes, jlong memory_pool_id) { + JNI_METHOD_START + if (schema_arr == NULL) { + JniThrow("Native Coalesce Batches: schema can't be null"); + } + if (buf_addrs == NULL) { + JniThrow("Native Coalesce Batches: buf_addrs can't be null"); + } + if (buf_sizes == NULL) { + JniThrow("Native Coalesce Batches: buf_sizes can't be null"); + } + + int in_bufs_len = env->GetArrayLength(buf_addrs); + if (in_bufs_len != env->GetArrayLength(buf_sizes)) { + JniThrow( + "Native Coalesce Batches: length of buf_addrs and buf_sizes mismatch"); + } + + std::shared_ptr schema; + // ValueOrDie in MakeSchema + MakeSchema(env, schema_arr, &schema); + + // convert the record batch to spark unsafe row. + auto* pool = reinterpret_cast(memory_pool_id); + if (pool == nullptr) { + JniThrow("Memory pool does not exist or has been closed"); + } + + arrow::RecordBatchVector batches; + + for (jint i = 0; i < in_bufs_len; i++) { + jlongArray addr_array = (jlongArray)env->GetObjectArrayElement(buf_addrs, i); + jlongArray size_array = (jlongArray)env->GetObjectArrayElement(buf_sizes, i); + jint* rows_arr = env->GetIntArrayElements(num_rows_arr, nullptr); + + + int addrs_len = env->GetArrayLength(addr_array); + if (addrs_len != env->GetArrayLength(size_array)) { + JniThrow( + "Native Coalesce Batches: length of addr_array and size_array mismatch"); + } + jlong* addrs_buf = env->GetLongArrayElements(addr_array, JNI_FALSE); + jlong* sizes_buf = env->GetLongArrayElements(size_array, JNI_FALSE); + + std::shared_ptr rb; + JniAssertOkOrThrow(MakeRecordBatch(schema, rows_arr[i], (int64_t*)addrs_buf, + (int64_t*)sizes_buf, addrs_len, &rb), + "Native Coalesce Batches: make record batch failed"); + batches.push_back(rb); + + + + env->ReleaseLongArrayElements(addr_array, addrs_buf, JNI_ABORT); + env->ReleaseLongArrayElements(size_array, sizes_buf, JNI_ABORT); + env->ReleaseIntArrayElements(num_rows_arr, rows_arr, JNI_ABORT); + } + + // int num_columns = batches.at(0)->num_columns(); + int num_columns = schema->num_fields(); + arrow::ArrayVector arrayColumns; + for (jint i = 0; i < num_columns; i++) { + arrow::ArrayVector arrvec; + for (const auto& batch : batches) { + arrvec.push_back(batch->column(i)); + } + std::shared_ptr bigArr; + Concatenate(arrvec, pool, &bigArr); + // ARROW_ASSIGN_OR_RAISE(auto bigArr, Concatenate(arrvec, pool)); + arrayColumns.push_back(bigArr); + } + auto out_batch = arrow::RecordBatch::Make(schema, total_num_rows, arrayColumns); + jbyteArray serialized_record_batch = + JniGetOrThrow(ToBytes(env, out_batch), "Error serializing message"); + return serialized_record_batch; + JNI_METHOD_END(nullptr) +} + JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_ArrowRowToColumnarJniWrapper_nativeConvertRowToColumnar( JNIEnv* env, jobject, jbyteArray schema_arr, jlongArray row_length, From 4c6c3003f8fadb54f1e9128f2c798f5a78f647dc Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 8 Apr 2022 11:19:11 +0800 Subject: [PATCH 2/5] Clean and format --- .../shuffle/ArrowCoalesceBatchesSuite.scala | 23 --------------- native-sql-engine/cpp/src/jni/jni_wrapper.cc | 28 ++++++++----------- 2 files changed, 12 insertions(+), 39 deletions(-) diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ArrowCoalesceBatchesSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ArrowCoalesceBatchesSuite.scala index d21e73d4b..e6078f70a 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ArrowCoalesceBatchesSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ArrowCoalesceBatchesSuite.scala @@ -40,31 +40,8 @@ class ArrowCoalesceBatchesSuite extends QueryTest with SharedSparkSession { val conf = super.sparkConf conf.set("spark.memory.offHeap.size", String.valueOf(MAX_DIRECT_MEMORY)) .set("spark.plugins", "com.intel.oap.GazellePlugin") - .set("spark.sql.codegen.wholeStage", "false") - .set("spark.sql.sources.useV1SourceList", "") - .set("spark.oap.sql.columnar.tmp_dir", "/tmp/") - .set("spark.sql.columnar.sort.broadcastJoin", "true") - .set("spark.storage.blockManagerSlaveTimeoutMs", "3600000") - .set("spark.executor.heartbeatInterval", "3600000") - .set("spark.network.timeout", "3601s") - .set("spark.oap.sql.columnar.preferColumnar", "true") - .set("spark.oap.sql.columnar.sortmergejoin", "true") - .set("spark.sql.columnar.codegen.hashAggregate", "false") - .set("spark.sql.columnar.sort", "true") - .set("spark.sql.columnar.window", "true") .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") - .set("spark.unsafe.exceptionOnMemoryLeak", "false") - .set("spark.network.io.preferDirectBufs", "false") - .set("spark.sql.sources.useV1SourceList", "arrow,parquet") .set("spark.sql.autoBroadcastJoinThreshold", "-1") - .set("spark.oap.sql.columnar.sortmergejoin.lazyread", "true") - .set("spark.oap.sql.columnar.autorelease", "false") - .set("spark.sql.shuffle.partitions", "50") - .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "5") - .set("spark.oap.sql.columnar.shuffledhashjoin.buildsizelimit", "200m") -// .set("spark.sql.join.preferSortMergeJoin", "false") -// .set("spark.sql.shuffle.partitions", "1") - .set("spark.oap.sql.columnar.wholestagecodegen", "false") .set("spark.oap.sql.columnar.forceshuffledhashjoin", "true") return conf } diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index db10c486b..34057256e 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -35,6 +35,7 @@ #include #include +#include "arrow/array/concatenate.h" #include "codegen/code_generator_factory.h" #include "codegen/common/hash_relation.h" #include "codegen/common/result_iterator.h" @@ -45,11 +46,10 @@ #include "proto/protobuf_utils.h" #include "shuffle/splitter.h" #include "utils/exception.h" -#include "arrow/array/concatenate.h" -#include "arrow/type_fwd.h" -#include "arrow/result.h" #include "arrow/array.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" namespace { @@ -1403,8 +1403,9 @@ Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeClose( JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_ArrowCoalesceBatchesJniWrapper_nativeCoalesceBatches( - JNIEnv* env, jobject, jbyteArray schema_arr, jint total_num_rows, jintArray num_rows_arr, - jobjectArray buf_addrs, jobjectArray buf_sizes, jlong memory_pool_id) { + JNIEnv* env, jobject, jbyteArray schema_arr, jint total_num_rows, + jintArray num_rows_arr, jobjectArray buf_addrs, jobjectArray buf_sizes, + jlong memory_pool_id) { JNI_METHOD_START if (schema_arr == NULL) { JniThrow("Native Coalesce Batches: schema can't be null"); @@ -1418,8 +1419,7 @@ Java_com_intel_oap_vectorized_ArrowCoalesceBatchesJniWrapper_nativeCoalesceBatch int in_bufs_len = env->GetArrayLength(buf_addrs); if (in_bufs_len != env->GetArrayLength(buf_sizes)) { - JniThrow( - "Native Coalesce Batches: length of buf_addrs and buf_sizes mismatch"); + JniThrow("Native Coalesce Batches: length of buf_addrs and buf_sizes mismatch"); } std::shared_ptr schema; @@ -1431,31 +1431,27 @@ Java_com_intel_oap_vectorized_ArrowCoalesceBatchesJniWrapper_nativeCoalesceBatch if (pool == nullptr) { JniThrow("Memory pool does not exist or has been closed"); } - + arrow::RecordBatchVector batches; - + for (jint i = 0; i < in_bufs_len; i++) { jlongArray addr_array = (jlongArray)env->GetObjectArrayElement(buf_addrs, i); jlongArray size_array = (jlongArray)env->GetObjectArrayElement(buf_sizes, i); jint* rows_arr = env->GetIntArrayElements(num_rows_arr, nullptr); - int addrs_len = env->GetArrayLength(addr_array); if (addrs_len != env->GetArrayLength(size_array)) { - JniThrow( - "Native Coalesce Batches: length of addr_array and size_array mismatch"); + JniThrow("Native Coalesce Batches: length of addr_array and size_array mismatch"); } jlong* addrs_buf = env->GetLongArrayElements(addr_array, JNI_FALSE); jlong* sizes_buf = env->GetLongArrayElements(size_array, JNI_FALSE); std::shared_ptr rb; JniAssertOkOrThrow(MakeRecordBatch(schema, rows_arr[i], (int64_t*)addrs_buf, - (int64_t*)sizes_buf, addrs_len, &rb), - "Native Coalesce Batches: make record batch failed"); + (int64_t*)sizes_buf, addrs_len, &rb), + "Native Coalesce Batches: make record batch failed"); batches.push_back(rb); - - env->ReleaseLongArrayElements(addr_array, addrs_buf, JNI_ABORT); env->ReleaseLongArrayElements(size_array, sizes_buf, JNI_ABORT); env->ReleaseIntArrayElements(num_rows_arr, rows_arr, JNI_ABORT); From 17439d30ec94d61b904beaebc7b63d5991a26f73 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 8 Apr 2022 11:51:12 +0800 Subject: [PATCH 3/5] Format cpp --- native-sql-engine/cpp/src/jni/jni_wrapper.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index 34057256e..485814b11 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -15,6 +15,8 @@ * limitations under the License. */ +#include +#include #include #include #include @@ -25,6 +27,8 @@ #include #include #include +#include +#include #include #include #include @@ -35,7 +39,6 @@ #include #include -#include "arrow/array/concatenate.h" #include "codegen/code_generator_factory.h" #include "codegen/common/hash_relation.h" #include "codegen/common/result_iterator.h" @@ -47,9 +50,6 @@ #include "shuffle/splitter.h" #include "utils/exception.h" -#include "arrow/array.h" -#include "arrow/result.h" -#include "arrow/type_fwd.h" namespace { From a2c504fe2e9054de4771baf9b4a257c424a1acbd Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 8 Apr 2022 11:52:24 +0800 Subject: [PATCH 4/5] Format cpp --- native-sql-engine/cpp/src/jni/jni_wrapper.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index 485814b11..b1bc66cdc 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -15,8 +15,8 @@ * limitations under the License. */ -#include #include +#include #include #include #include @@ -50,7 +50,6 @@ #include "shuffle/splitter.h" #include "utils/exception.h" - namespace { #define JNI_METHOD_START try { From 9cec7e1781e9bc19ef88ffd761972d08176ca797 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 13 Apr 2022 11:14:24 +0800 Subject: [PATCH 5/5] Add ut for memory leak detection --- .../cpp/src/tests/jniutils_test.cc | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/native-sql-engine/cpp/src/tests/jniutils_test.cc b/native-sql-engine/cpp/src/tests/jniutils_test.cc index 0d29a1a9e..2ec2e8928 100644 --- a/native-sql-engine/cpp/src/tests/jniutils_test.cc +++ b/native-sql-engine/cpp/src/tests/jniutils_test.cc @@ -15,6 +15,7 @@ * limitations under the License. */ +#include #include #include #include @@ -86,6 +87,59 @@ TEST_F(JniUtilsTest, TestMakeRecordBatchWithList) { ASSERT_TRUE(rb->Equals(*input_batch_arr.get())); } +TEST_F(JniUtilsTest, TestRecordBatchConcatenate) { + auto f_arr_str = field("f_arr", arrow::list(arrow::utf8())); + auto f_arr_bool = field("f_bool", arrow::list(arrow::boolean())); + auto f_arr_int32 = field("f_int32", arrow::list(arrow::int32())); + auto f_arr_double = field("f_double", arrow::list(arrow::float64())); + auto f_arr_decimal = field("f_decimal", arrow::list(arrow::decimal(10, 2))); + + auto schema = + arrow::schema({f_arr_str, f_arr_bool, f_arr_int32, f_arr_double, f_arr_decimal}); + + const std::vector input_data_arr = { + R"([["alice0", "bob1"], ["alice2"], ["bob3"], ["Alice4", "Bob5", "AlicE6"], ["boB7"], ["ALICE8", "BOB9"]])", + R"([[true, null], [true, true, true], [false], [true], [false], [false]])", + R"([[1, 2, 3], [9, 8], null, [3, 1], [0], [1, 9, null]])", + R"([[0.26121], [-9.12123, 6.111111], [8.121], [7.21, null], [3.2123, 6,1121], null])", + R"([["0.26"], ["-9.12", "6.11"], ["8.12"], ["7.21", null], ["3.21", "6.11"], ["9.88"]])"}; + + std::shared_ptr batch1; + std::shared_ptr batch2; + std::shared_ptr res_batch_arr; + MakeInputBatch(input_data_arr, schema, &batch1); + MakeInputBatch(input_data_arr, schema, &batch2); + + arrow::RecordBatchVector batches; + batches.push_back(batch1); + batches.push_back(batch2); + + for (int i = 0; i < 100; i++) { + for (int i = 0; i < 10000; i++) { + int total_num_rows = batch1->num_rows() + batch2->num_rows(); + + // int num_columns = batches.at(0)->num_columns(); + int num_columns = schema->num_fields(); + arrow::ArrayVector arrayColumns; + for (jint i = 0; i < num_columns; i++) { + arrow::ArrayVector arrvec; + for (const auto& batch : batches) { + arrvec.push_back(batch->column(i)); + } + std::shared_ptr bigArr; + Concatenate(arrvec, default_memory_pool(), &bigArr); + // ARROW_ASSIGN_OR_RAISE(auto bigArr, Concatenate(arrvec, pool)); + arrayColumns.push_back(bigArr); + } + auto out_batch = arrow::RecordBatch::Make(schema, total_num_rows, arrayColumns); + + std::cout << "out_batch->num_rows():" << out_batch->num_rows() << std::endl; + } + + sleep(3); + } +} + TEST_F(JniUtilsTest, TestMakeRecordBatchBuild_Int_Struct) { auto f_int32 = field("f_simple_int32", arrow::int32()); auto f_struct_int32 =