From e329253bfc7f30cfafd05596280747cb350b3cde Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 13 Apr 2022 12:17:28 +0800 Subject: [PATCH 1/3] [NSE-728] Upgrade to Arrow 7.0.0 (#729) Known issues of current Arrow 7.0.0 support: 1. Data Source writing / ORC reading is disabled; 2. Data Source filter pushdown is disabled; 3. FastPFor compression is leading to unexpected concurrent memory writes. Use LZ4 instead. --- .github/workflows/tpch.yml | 2 +- .github/workflows/unittests.yml | 6 +- arrow-data-source/README.md | 2 +- .../intel/oap/spark/sql/ArrowWriteQueue.scala | 6 +- arrow-data-source/script/build_arrow.sh | 4 +- .../datasources/arrow/ArrowFileFormat.scala | 5 +- .../datasources/v2/arrow/ArrowFilters.scala | 206 ------------------ .../arrow/ArrowPartitionReaderFactory.scala | 3 +- .../datasources/v2/arrow/ArrowUtils.scala | 3 +- .../arrow/ArrowDataSourceTest.scala | 12 +- docs/ApacheArrowInstallation.md | 2 +- docs/Installation.md | 4 +- .../com/intel/oap/vectorized/JniUtils.java | 8 +- native-sql-engine/cpp/src/CMakeLists.txt | 8 +- .../codegen/arrow_compute/ext/actions_impl.cc | 37 ++-- .../codegen/arrow_compute/ext/array_taker.h | 22 +- .../ext/hash_aggregate_kernel.cc | 25 ++- .../codegen/arrow_compute/ext/kernels_ext.cc | 6 +- .../codegen/arrow_compute/ext/sort_kernel.cc | 51 ++++- .../cpp/src/codegen/common/visitor_base.h | 6 + native-sql-engine/cpp/src/jni/jni_common.h | 8 +- native-sql-engine/cpp/src/jni/jni_wrapper.cc | 3 +- .../operators/row_to_columnar_converter.cc | 42 ++-- native-sql-engine/cpp/src/shuffle/splitter.cc | 14 +- .../src/tests/arrow_compute_test_aggregate.cc | 40 ++-- .../arrow_compute_test_aggregate_decimal.cc | 4 +- .../src/tests/arrow_compute_test_window.cc | 3 +- .../cpp/src/tests/arrow_compute_test_wscg.cc | 35 +-- .../cpp/src/third_party/arrow/utils/hashing.h | 17 +- pom.xml | 2 +- 30 files changed, 244 insertions(+), 342 deletions(-) delete mode 100644 arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowFilters.scala diff --git a/.github/workflows/tpch.yml b/.github/workflows/tpch.yml index 55995e960..a8b7c4715 100644 --- a/.github/workflows/tpch.yml +++ b/.github/workflows/tpch.yml @@ -51,7 +51,7 @@ jobs: run: | cd /tmp git clone https://github.com/oap-project/arrow.git - cd arrow && git checkout arrow-4.0.0-oap && cd cpp + cd arrow && git checkout arrow-7.0.0-oap && cd cpp mkdir build && cd build cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DARROW_JEMALLOC=OFF && make -j2 sudo make install diff --git a/.github/workflows/unittests.yml b/.github/workflows/unittests.yml index 50263fae5..7d67a6123 100644 --- a/.github/workflows/unittests.yml +++ b/.github/workflows/unittests.yml @@ -45,7 +45,7 @@ jobs: run: | cd /tmp git clone https://github.com/oap-project/arrow.git - cd arrow && git checkout arrow-4.0.0-oap && cd cpp + cd arrow && git checkout arrow-7.0.0-oap && cd cpp mkdir build && cd build cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2 sudo make install @@ -88,7 +88,7 @@ jobs: run: | cd /tmp git clone https://github.com/oap-project/arrow.git - cd arrow && git checkout arrow-4.0.0-oap && cd cpp + cd arrow && git checkout arrow-7.0.0-oap && cd cpp mkdir build && cd build cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2 sudo make install @@ -133,7 +133,7 @@ jobs: run: | cd /tmp git clone https://github.com/oap-project/arrow.git - cd arrow && git checkout arrow-4.0.0-oap && cd cpp + cd arrow && git checkout arrow-7.0.0-oap && cd cpp mkdir build && cd build cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2 sudo make install diff --git a/arrow-data-source/README.md b/arrow-data-source/README.md index 20e07cf9e..549f528e5 100644 --- a/arrow-data-source/README.md +++ b/arrow-data-source/README.md @@ -117,7 +117,7 @@ You have to use a customized Arrow to support for our datasets Java API. ``` // build arrow-cpp -git clone -b arrow-4.0.0-oap https://github.com/oap-project/arrow.git +git clone -b arrow-7.0.0-oap https://github.com/oap-project/arrow.git cd arrow/cpp mkdir build cd build diff --git a/arrow-data-source/common/src/main/scala/com/intel/oap/spark/sql/ArrowWriteQueue.scala b/arrow-data-source/common/src/main/scala/com/intel/oap/spark/sql/ArrowWriteQueue.scala index d03ab27b0..84c53bbb3 100644 --- a/arrow-data-source/common/src/main/scala/com/intel/oap/spark/sql/ArrowWriteQueue.scala +++ b/arrow-data-source/common/src/main/scala/com/intel/oap/spark/sql/ArrowWriteQueue.scala @@ -27,7 +27,6 @@ import java.util.regex.Pattern import com.intel.oap.spark.sql.ArrowWriteQueue.EOS_BATCH import com.intel.oap.spark.sql.ArrowWriteQueue.ScannerImpl -import org.apache.arrow.dataset.file.DatasetFileWriter import org.apache.arrow.dataset.file.format.FileFormat import org.apache.arrow.dataset.scanner.Scanner import org.apache.arrow.dataset.scanner.ScanTask @@ -47,12 +46,15 @@ class ArrowWriteQueue(schema: Schema, fileFormat: FileFormat, outputFileURI: Str val dirURI = matcher.group(1) val fileName = matcher.group(2) - DatasetFileWriter.write(scanner, fileFormat, dirURI, Array(), 1, fileName) +// disable write by arrow 7.0.0 +// DatasetFileWriter.write(scanner, fileFormat, dirURI, Array(), 1, fileName) }, "ArrowWriteQueue - " + UUID.randomUUID().toString) writeThread.start() def enqueue(batch: ArrowRecordBatch): Unit = { + // disable write by arrow 7.0.0 + throw new UnsupportedOperationException("write is disabled by arrow 7.0.0 rebase") scanner.enqueue(batch) } diff --git a/arrow-data-source/script/build_arrow.sh b/arrow-data-source/script/build_arrow.sh index d8ec40128..3de988213 100755 --- a/arrow-data-source/script/build_arrow.sh +++ b/arrow-data-source/script/build_arrow.sh @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}" echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}" mkdir -p $ARROW_SOURCE_DIR mkdir -p $ARROW_INSTALL_DIR -git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR +git clone https://github.com/oap-project/arrow.git --branch arrow-7.0.0-oap $ARROW_SOURCE_DIR pushd $ARROW_SOURCE_DIR cmake ./cpp \ @@ -98,7 +98,7 @@ make -j$NPROC make install cd java -mvn clean install -P arrow-jni -am -Darrow.cpp.build.dir=${ARROW_INSTALL_DIR}/lib -DskipTests -Dcheckstyle.skip +mvn clean install -P arrow-jni -pl dataset,gandiva -am -Darrow.cpp.build.dir=${ARROW_INSTALL_DIR}/lib -DskipTests -Dcheckstyle.skip popd echo "Finish to build Arrow from Source !!!" else diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala index 32632bfe0..f0426258c 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import com.intel.oap.spark.sql.ArrowWriteExtension.FakeRow import com.intel.oap.spark.sql.ArrowWriteQueue -import com.intel.oap.spark.sql.execution.datasources.v2.arrow.{ArrowFilters, ArrowOptions, ArrowUtils} +import com.intel.oap.spark.sql.execution.datasources.v2.arrow.{ArrowOptions, ArrowUtils} import com.intel.oap.spark.sql.execution.datasources.v2.arrow.ArrowSQLConf._ import com.intel.oap.vectorized.ArrowWritableColumnVector import org.apache.arrow.dataset.scanner.ScanOptions @@ -128,7 +128,8 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab val dataset = factory.finish(); val filter = if (enableFilterPushDown) { - ArrowFilters.translateFilters(filters) + // disable filter pushdown by arrow 7.0.0 + org.apache.arrow.dataset.filter.Filter.EMPTY } else { org.apache.arrow.dataset.filter.Filter.EMPTY } diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowFilters.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowFilters.scala deleted file mode 100644 index f33c7995a..000000000 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowFilters.scala +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.oap.spark.sql.execution.datasources.v2.arrow - -import org.apache.arrow.dataset.DatasetTypes -import org.apache.arrow.dataset.DatasetTypes.TreeNode -import org.apache.arrow.dataset.filter.FilterImpl - -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.StructType - -object ArrowFilters { - def pruneWithSchema(pushedFilters: Array[Filter], schema: StructType): Seq[Filter] = { - pushedFilters.filter(pushedFilter => { - isToBeAccepted(pushedFilter, schema) - }) - } - - private def isToBeAccepted(pushedFilter: Filter, schema: StructType): Boolean = { - pushedFilter match { - case EqualTo(attribute, value) => existsIn(attribute, schema) - case GreaterThan(attribute, value) => existsIn(attribute, schema) - case GreaterThanOrEqual(attribute, value) => existsIn(attribute, schema) - case LessThan(attribute, value) => existsIn(attribute, schema) - case LessThanOrEqual(attribute, value) => existsIn(attribute, schema) - case Not(child) => isToBeAccepted(child, schema) - case And(left, right) => isToBeAccepted(left, schema) && isToBeAccepted(right, schema) - case Or(left, right) => isToBeAccepted(left, schema) && isToBeAccepted(right, schema) - case IsNotNull(attribute) => existsIn(attribute, schema) - case IsNull(attribute) => existsIn(attribute, schema) - case _ => false // fixme complete this - } - } - - private def existsIn(attr: String, schema: StructType): Boolean = { - schema.foreach(f => { - if (f.name == attr) { - return true; - } - }) - false - } - - def translateFilters(pushedFilters: Seq[Filter]): org.apache.arrow.dataset.filter.Filter = { - val node = pushedFilters - .flatMap(translateFilter) - .reduceOption((t1: TreeNode, t2: TreeNode) => { - DatasetTypes.TreeNode.newBuilder.setAndNode( - DatasetTypes.AndNode.newBuilder() - .setLeftArg(t1) - .setRightArg(t2) - .build()).build() - }) - if (node.isDefined) { - new FilterImpl(DatasetTypes.Condition.newBuilder() - .setRoot(node.get).build) - } else { - org.apache.arrow.dataset.filter.Filter.EMPTY - } - } - - private def translateValue(value: Any): Option[TreeNode] = { - value match { - case v: Integer => Some( - DatasetTypes.TreeNode.newBuilder.setIntNode( - DatasetTypes.IntNode.newBuilder.setValue(v).build) - .build) - case v: Long => Some( - DatasetTypes.TreeNode.newBuilder.setLongNode( - DatasetTypes.LongNode.newBuilder.setValue(v).build) - .build) - case v: Float => Some( - DatasetTypes.TreeNode.newBuilder.setFloatNode( - DatasetTypes.FloatNode.newBuilder.setValue(v).build) - .build) - case v: Double => Some( - DatasetTypes.TreeNode.newBuilder.setDoubleNode( - DatasetTypes.DoubleNode.newBuilder.setValue(v).build) - .build) - case v: Boolean => Some( - DatasetTypes.TreeNode.newBuilder.setBooleanNode( - DatasetTypes.BooleanNode.newBuilder.setValue(v).build) - .build) - case _ => None // fixme complete this - } - } - - private def translateFilter(pushedFilter: Filter): Option[TreeNode] = { - pushedFilter match { - case EqualTo(attribute, value) => - createComparisonNode("equal", attribute, value) - case GreaterThan(attribute, value) => - createComparisonNode("greater", attribute, value) - case GreaterThanOrEqual(attribute, value) => - createComparisonNode("greater_equal", attribute, value) - case LessThan(attribute, value) => - createComparisonNode("less", attribute, value) - case LessThanOrEqual(attribute, value) => - createComparisonNode("less_equal", attribute, value) - case Not(child) => - createNotNode(child) - case And(left, right) => - createAndNode(left, right) - case Or(left, right) => - createOrNode(left, right) - case IsNotNull(attribute) => - createIsNotNullNode(attribute) - case IsNull(attribute) => - createIsNullNode(attribute) - case _ => None // fixme complete this - } - } - - private def createComparisonNode(opName: String, - attribute: String, value: Any): Option[TreeNode] = { - val translatedValue = translateValue(value) - translatedValue match { - case Some(v) => Some( - DatasetTypes.TreeNode.newBuilder.setCpNode( - DatasetTypes.ComparisonNode.newBuilder - .setOpName(opName) // todo make op names enumerable - .setLeftArg( - DatasetTypes.TreeNode.newBuilder.setFieldNode( - DatasetTypes.FieldNode.newBuilder.setName(attribute).build) - .build) - .setRightArg(v) - .build) - .build) - case None => None - } - } - - def createNotNode(child: Filter): Option[TreeNode] = { - val translatedChild = translateFilter(child) - if (translatedChild.isEmpty) { - return None - } - Some(DatasetTypes.TreeNode.newBuilder - .setNotNode(DatasetTypes.NotNode.newBuilder.setArgs(translatedChild.get).build()).build()) - } - - def createIsNotNullNode(attribute: String): Option[TreeNode] = { - Some(DatasetTypes.TreeNode.newBuilder - .setIsValidNode( - DatasetTypes.IsValidNode.newBuilder.setArgs( - DatasetTypes.TreeNode.newBuilder.setFieldNode( - DatasetTypes.FieldNode.newBuilder.setName(attribute).build) - .build).build()).build()) - } - - def createIsNullNode(attribute: String): Option[TreeNode] = { - Some(DatasetTypes.TreeNode.newBuilder - .setNotNode( - DatasetTypes.NotNode.newBuilder.setArgs( - DatasetTypes.TreeNode.newBuilder - .setIsValidNode( - DatasetTypes.IsValidNode.newBuilder.setArgs( - DatasetTypes.TreeNode.newBuilder.setFieldNode( - DatasetTypes.FieldNode.newBuilder.setName(attribute).build) - .build) - .build()).build()).build()).build()) - } - - def createAndNode(left: Filter, right: Filter): Option[TreeNode] = { - val translatedLeft = translateFilter(left) - val translatedRight = translateFilter(right) - if (translatedLeft.isEmpty || translatedRight.isEmpty) { - return None - } - Some(DatasetTypes.TreeNode.newBuilder - .setAndNode(DatasetTypes.AndNode.newBuilder - .setLeftArg(translatedLeft.get) - .setRightArg(translatedRight.get) - .build()) - .build()) - } - - def createOrNode(left: Filter, right: Filter): Option[TreeNode] = { - val translatedLeft = translateFilter(left) - val translatedRight = translateFilter(right) - if (translatedLeft.isEmpty || translatedRight.isEmpty) { - return None - } - Some(DatasetTypes.TreeNode.newBuilder - .setOrNode(DatasetTypes.OrNode.newBuilder - .setLeftArg(translatedLeft.get) - .setRightArg(translatedRight.get) - .build()) - .build()) - } -} diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala index 99ccd781a..b661cca97 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala @@ -61,7 +61,8 @@ case class ArrowPartitionReaderFactory( partitionedFile.start, partitionedFile.length, options) val dataset = factory.finish() val filter = if (enableFilterPushDown) { - ArrowFilters.translateFilters(ArrowFilters.pruneWithSchema(pushedFilters, readDataSchema)) + // disable filter pushdown by arrow 7.0.0 + org.apache.arrow.dataset.filter.Filter.EMPTY } else { org.apache.arrow.dataset.filter.Filter.EMPTY } diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala index f6cc0303e..5243af4d3 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala @@ -117,7 +117,8 @@ object ArrowUtils { val paramMap = options.parameters.toMap.asJava options.originalFormat match { case "parquet" => org.apache.arrow.dataset.file.format.ParquetFileFormat.create(paramMap) - case "orc" => org.apache.arrow.dataset.file.format.OrcFileFormat.create(paramMap) +// disable orc by arrow 7.0.0 +// case "orc" => org.apache.arrow.dataset.file.format.OrcFileFormat.create(paramMap) case "csv" => org.apache.arrow.dataset.file.format.CsvFileFormat.create(paramMap) case _ => throw new IllegalArgumentException("Unrecognizable format") } diff --git a/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala b/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala index 9896ac1b4..7a54f0f23 100644 --- a/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala +++ b/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala @@ -203,7 +203,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { assert(rows.length === 3) } - test("simple parquet write") { + ignore("simple parquet write") { val path = ArrowDataSourceTest.locateResourcePath(parquetFile3) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") @@ -339,7 +339,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { df.show() } - test("orc reader on data type: struct, array, map") { + ignore("orc reader on data type: struct, array, map") { val path = ArrowDataSourceTest.locateResourcePath(orcFile1) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc") @@ -363,7 +363,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { } private val orcFile = "people.orc" - test("read orc file") { + ignore("read orc file") { val path = ArrowDataSourceTest.locateResourcePath(orcFile) verifyFrame( spark.read @@ -372,7 +372,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { .load(path), 2, 3) } - test("read orc file - programmatic API ") { + ignore("read orc file - programmatic API ") { val path = ArrowDataSourceTest.locateResourcePath(orcFile) verifyFrame( spark.read @@ -380,7 +380,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { .arrow(path), 2, 3) } - test("create catalog table for orc") { + ignore("create catalog table for orc") { val path = ArrowDataSourceTest.locateResourcePath(orcFile) // spark.catalog.createTable("people", path, "arrow") spark.catalog.createTable("people", "arrow", Map("path" -> path, "originalFormat" -> "orc")) @@ -389,7 +389,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { verifyFrame(spark.sql(sql), 2, 3) } - test("simple SQL query on orc file ") { + ignore("simple SQL query on orc file ") { val path = ArrowDataSourceTest.locateResourcePath(orcFile) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc") diff --git a/docs/ApacheArrowInstallation.md b/docs/ApacheArrowInstallation.md index c40734dda..1b2a584fe 100644 --- a/docs/ApacheArrowInstallation.md +++ b/docs/ApacheArrowInstallation.md @@ -30,7 +30,7 @@ Please make sure your cmake version is qualified based on the prerequisite. # Arrow ``` shell git clone https://github.com/oap-project/arrow.git -cd arrow && git checkout arrow-4.0.0-oap +cd arrow && git checkout arrow-7.0.0-oap mkdir -p arrow/cpp/release-build cd arrow/cpp/release-build cmake -DARROW_DEPENDENCY_SOURCE=BUNDLED -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_BOOST_USE_SHARED=ON -DARROW_JNI=ON -DARROW_DATASET=ON -DARROW_WITH_PROTOBUF=ON -DARROW_WITH_SNAPPY=ON -DARROW_WITH_LZ4=ON -DARROW_FILESYSTEM=ON -DARROW_JSON=ON .. diff --git a/docs/Installation.md b/docs/Installation.md index f27f6f56d..d632a9cd6 100644 --- a/docs/Installation.md +++ b/docs/Installation.md @@ -31,8 +31,8 @@ Based on the different environment, there are some parameters can be set via -D | arrow_root | When build_arrow set to False, arrow_root will be enabled to find the location of your existing arrow library. | /usr/local | | build_protobuf | Build Protobuf from Source. If set to False, default library path will be used to find protobuf library. | True | -When build_arrow set to True, the build_arrow.sh will be launched and compile a custom arrow library from [OAP Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap) -If you wish to change any parameters from Arrow, you can change it from the `build_arrow.sh` script under `gazelle_plugin/arrow-data-source/script/`. +When build_arrow set to True, the build_arrow.sh will be launched and compile a custom arrow library from [OAP Arrow](https://github.com/oap-project/arrow/tree/arrow-7.0.0-oap) +If you wish to change any parameters from Arrow, you can change it from the `build_arrow.sh` script under `native-sql-engine/arrow-data-source/script/`. ### Additional Notes [Notes for Installation Issues](./InstallationNotes.md) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java index 8d5baa49c..7a55f82b5 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java @@ -39,10 +39,10 @@ /** Helper class for JNI related operations. */ public class JniUtils { private static final String LIBRARY_NAME = "spark_columnar_jni"; - private static final String ARROW_LIBRARY_NAME = "libarrow.so.400.0.0"; - private static final String ARROW_PARENT_LIBRARY_NAME = "libarrow.so.400"; - private static final String GANDIVA_LIBRARY_NAME = "libgandiva.so.400.0.0"; - private static final String GANDIVA_PARENT_LIBRARY_NAME = "libgandiva.so.400"; + private static final String ARROW_LIBRARY_NAME = "libarrow.so.700.0.0"; + private static final String ARROW_PARENT_LIBRARY_NAME = "libarrow.so.700"; + private static final String GANDIVA_LIBRARY_NAME = "libgandiva.so.700.0.0"; + private static final String GANDIVA_PARENT_LIBRARY_NAME = "libgandiva.so.700"; private static boolean isLoaded = false; private static boolean isCodegenDependencyLoaded = false; private static List codegenJarsLoadedCache = new ArrayList<>(); diff --git a/native-sql-engine/cpp/src/CMakeLists.txt b/native-sql-engine/cpp/src/CMakeLists.txt index b579bc879..7241aa47a 100644 --- a/native-sql-engine/cpp/src/CMakeLists.txt +++ b/native-sql-engine/cpp/src/CMakeLists.txt @@ -17,7 +17,7 @@ set(ARROW_ROOT "/usr/local" CACHE PATH "Arrow Root dir") set(ARROW_BFS_INSTALL_DIR "/usr/local" CACHE PATH "Arrow Build from Source dir") set(ARROW_LIB_NAME arrow) set(GANDIVA_LIB_NAME gandiva) -set(ARROW_SHARED_LIBRARY_SUFFIX ".so.400") +set(ARROW_SHARED_LIBRARY_SUFFIX ".so.700") option(BUILD_ARROW "Build Arrow from Source" ON) option(STATIC_ARROW "Build Arrow with Static Libraries" OFF) @@ -140,7 +140,7 @@ macro(build_arrow STATIC_ARROW) ExternalProject_Add(arrow_ep GIT_REPOSITORY https://github.com/oap-project/arrow.git SOURCE_DIR ${ARROW_SOURCE_DIR} - GIT_TAG arrow-4.0.0-oap + GIT_TAG arrow-7.0.0-oap BUILD_IN_SOURCE 1 INSTALL_DIR ${ARROW_PREFIX} INSTALL_COMMAND make install @@ -222,14 +222,14 @@ macro(build_arrow STATIC_ARROW) ExternalProject_Add_Step(arrow_ep copy_arrow_binary_400_0_0 COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${ARROW_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX} ${root_directory}/releases/ - COMMENT "Copy libarrow.so.400.0.0 to releases/" + COMMENT "Copy libarrow.so.700.0.0 to releases/" DEPENDEES mkdir download update patch configure build install java_install WORKING_DIRECTORY "${ARROW_PREFIX}/" ) ExternalProject_Add_Step(arrow_ep copy_gandiva_binary_400_0_0 COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${GANDIVA_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX} ${root_directory}/releases/ - COMMENT "Copy libgandiva.so.400.0.0 to releases/" + COMMENT "Copy libgandiva.so.700.0.0 to releases/" DEPENDEES mkdir download update patch configure build install java_install WORKING_DIRECTORY "${ARROW_PREFIX}/" ) diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc index 5a0860ac3..479fa5666 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc @@ -788,7 +788,7 @@ class MinAction> auto GetMinResult(const arrow::ArrayVector& in) -> typename std::enable_if_t::value> { arrow::Datum minMaxOut; - arrow::compute::MinMaxOptions option; + arrow::compute::ScalarAggregateOptions option; auto maybe_minMaxOut = arrow::compute::MinMax(*in[0].get(), option, ctx_); minMaxOut = *std::move(maybe_minMaxOut); const arrow::StructScalar& value = minMaxOut.scalar_as(); @@ -1392,7 +1392,7 @@ class MaxAction> auto GetMaxResult(const arrow::ArrayVector& in) -> typename std::enable_if_t::value> { arrow::Datum minMaxOut; - arrow::compute::MinMaxOptions option; + arrow::compute::ScalarAggregateOptions option; auto maybe_minMaxOut = arrow::compute::MinMax(*in[0].get(), option, ctx_); minMaxOut = *std::move(maybe_minMaxOut); const arrow::StructScalar& value = minMaxOut.scalar_as(); @@ -1838,7 +1838,8 @@ class SumAction(output.scalar()); cache_[0] += typed_scalar->value; @@ -2147,7 +2148,8 @@ class SumActionPartial(output.scalar()); cache_[0] += typed_scalar->value; @@ -2474,12 +2476,13 @@ class AvgAction(output.scalar()); cache_sum_[0] += typed_scalar->value; - arrow::compute::CountOptions option(arrow::compute::CountOptions::COUNT_NON_NULL); + arrow::compute::CountOptions option(arrow::compute::CountOptions::ONLY_VALID); maybe_output = arrow::compute::Count(*in[0].get(), option, ctx_); output = *std::move(maybe_output); auto count_typed_scalar = std::dynamic_pointer_cast(output.scalar()); @@ -2645,12 +2648,13 @@ class AvgAction(output.scalar()); cache_sum_[0] += typed_scalar->value; - arrow::compute::CountOptions option(arrow::compute::CountOptions::COUNT_NON_NULL); + arrow::compute::CountOptions option(arrow::compute::CountOptions::ONLY_VALID); maybe_output = arrow::compute::Count(*in[0].get(), option, ctx_); output = *std::move(maybe_output); auto count_typed_scalar = std::dynamic_pointer_cast(output.scalar()); @@ -2824,12 +2828,13 @@ class SumCountAction(output.scalar()); cache_sum_[0] += typed_scalar->value; - arrow::compute::CountOptions option(arrow::compute::CountOptions::COUNT_NON_NULL); + arrow::compute::CountOptions option(arrow::compute::CountOptions::ONLY_VALID); maybe_output = arrow::compute::Count(*in[0].get(), option, ctx_); output = *std::move(maybe_output); auto count_typed_scalar = std::dynamic_pointer_cast(output.scalar()); @@ -3179,12 +3184,14 @@ class SumCountMergeAction(output.scalar()); cache_sum_[0] += typed_scalar->value; - maybe_output = arrow::compute::Sum(*in[1].get(), ctx_); + maybe_output = arrow::compute::Sum( + *in[1].get(), arrow::compute::ScalarAggregateOptions::Defaults(), ctx_); output = *std::move(maybe_output); auto count_typed_scalar = std::dynamic_pointer_cast(output.scalar()); cache_count_[0] += count_typed_scalar->value; @@ -3529,12 +3536,14 @@ class AvgByCountAction(output.scalar()); cache_sum_[0] += typed_scalar->value; - maybe_output = arrow::compute::Sum(*in[1].get(), ctx_); + maybe_output = arrow::compute::Sum( + *in[1].get(), arrow::compute::ScalarAggregateOptions::Defaults(), ctx_); output = *std::move(maybe_output); auto count_typed_scalar = std::dynamic_pointer_cast(output.scalar()); cache_count_[0] += count_typed_scalar->value; diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_taker.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_taker.h index 4c9704c2f..6e4844ed5 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_taker.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_taker.h @@ -131,10 +131,10 @@ class ArrayTaker> : public T if (cached_arr_[array_id]->null_count() > 0 && cached_arr_[item->array_id]->IsNull(item->id)) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = CType{}; } else { - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); array_data[position] = cached_arr_[array_id]->GetView(item->id); } position++; @@ -199,7 +199,7 @@ class ArrayTaker> : public T while (position < length) { auto item = indices_begin + position; bool val = cached_arr_[item->array_id]->GetView(item->id); - arrow::BitUtil::SetBitTo(array_data, position, val); + arrow::bit_util::SetBitTo(array_data, position, val); position++; } } else { @@ -211,11 +211,11 @@ class ArrayTaker> : public T if (cached_arr_[array_id]->null_count() > 0 && cached_arr_[array_id]->IsNull(item->id)) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); } else { - arrow::BitUtil::SetBitTo(array_data, position, - cached_arr_[array_id]->GetView(item->id)); - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(array_data, position, + cached_arr_[array_id]->GetView(item->id)); + arrow::bit_util::SetBitTo(out_is_valid, position, true); } position++; } @@ -295,10 +295,10 @@ class ArrayTaker> : public TakerBas if (cached_arr_[array_id]->null_count() > 0 && cached_arr_[array_id]->IsNull(item->id)) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = CType{}; } else { - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); array_data[position] = cached_arr_[array_id]->GetView(item->id); } position++; @@ -488,10 +488,10 @@ class ArrayTaker> : public TakerB if (cached_arr_[array_id]->null_count() > 0 && cached_arr_[array_id]->IsNull(item->id)) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = CType{}; } else { - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); array_data[position] = cached_arr_[array_id]->GetView(item->id); } position++; diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc index 2288865e8..9a3c6cecf 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc @@ -827,7 +827,13 @@ class HashAggregateKernel::Impl { if (post_process_projector_) { RETURN_NOT_OK(post_process_projector_->Evaluate(&outputs)); } - *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); + if (result_schema_->fields().empty() && !outputs.empty()) { + // treat as metadata query, for distinct count? see TPC-DS Q38 / Q87 + *out = arrow::RecordBatch::Make(result_schema_, out_length, + std::vector>()); + } else { + *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); + } return arrow::Status::OK(); } @@ -985,7 +991,13 @@ class HashAggregateKernel::Impl { if (post_process_projector_) { RETURN_NOT_OK(post_process_projector_->Evaluate(&outputs)); } - *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); + if (result_schema_->fields().empty() && !outputs.empty()) { + // treat as metadata query, for distinct count? see TPC-DS Q38 / Q87 + *out = arrow::RecordBatch::Make(result_schema_, out_length, + std::vector>()); + } else { + *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); + } return arrow::Status::OK(); } @@ -1135,8 +1147,13 @@ class HashAggregateKernel::Impl { if (post_process_projector_) { RETURN_NOT_OK(post_process_projector_->Evaluate(&outputs)); } - - *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); + if (result_schema_->fields().empty() && !outputs.empty()) { + // treat as metadata query, for distinct count? see TPC-DS Q38 / Q87 + *out = arrow::RecordBatch::Make(result_schema_, out_length, + std::vector>()); + } else { + *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); + } return arrow::Status::OK(); } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc index fd5b67de1..b7c21dd38 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc @@ -30,7 +30,6 @@ #include #include #include -#include #include #include #include @@ -543,12 +542,11 @@ class ConcatArrayListKernel::Impl { } arrow::ArrayVector concatenated_array_list; for (auto arr_list : cached_) { - std::shared_ptr concatenated_array; arrow::ArrayVector to_be_concat_arr(arr_list.begin() + cur_batch_idx_, arr_list.begin() + end_arr_idx); - RETURN_NOT_OK(arrow::Concatenate(to_be_concat_arr, ctx_->memory_pool(), - &concatenated_array)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr concatenated_array, + arrow::Concatenate(to_be_concat_arr, ctx_->memory_pool())); concatenated_array_list.push_back(concatenated_array); } int length = concatenated_array_list[0]->length(); diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc index 8d7292e76..07fd657b4 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc @@ -455,9 +455,9 @@ class SortArraysToIndicesKernel::Impl { auto schema = current_in_cache_batch_->schema(); std::vector> out_arrs; for (auto arr : current_in_cache_batch_->columns()) { - std::shared_ptr copied; auto sliced = arr->Slice(cache_offset_); - RETURN_NOT_OK(arrow::Concatenate({sliced}, ctx_->memory_pool(), &copied)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr copied, + arrow::Concatenate({sliced}, ctx_->memory_pool())); out_arrs.push_back(copied); } *out = arrow::RecordBatch::Make(schema, cur_size, out_arrs); @@ -971,8 +971,8 @@ class SortInplaceKernel : public SortArraysToIndicesKernel::Impl { std::shared_ptr schema, std::shared_ptr>* out) override { if (cached_0_.empty()) return arrow::Status::OK(); - RETURN_NOT_OK( - arrow::Concatenate(cached_0_, ctx_->memory_pool(), &concatenated_array_)); + ARROW_ASSIGN_OR_RAISE(concatenated_array_, + arrow::Concatenate(cached_0_, ctx_->memory_pool())); if (nulls_total_ == 0) { // Function SortNoNull is used. CTYPE* indices_begin = concatenated_array_->data()->GetMutableValues(1); @@ -1153,7 +1153,7 @@ class SortInplaceKernel : public SortArraysToIndicesKernel::Impl { uint8_t* dst = (*out)->mutable_data(); if (bitmap.AllSet()) { - arrow::BitUtil::SetBitsTo(dst, offset, length, true); + arrow::bit_util::SetBitsTo(dst, offset, length, true); } else { arrow::internal::CopyBitmap(bitmap.data, offset, length, dst, 0); } @@ -1283,9 +1283,39 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { RETURN_NOT_OK(key_projector_->Evaluate(*in_batch, ctx_->memory_pool(), &outputs)); cached_key_.push_back(std::make_shared(outputs[0])); nulls_total_ += outputs[0]->null_count(); +#ifdef DEBUG + int debug_nulls_count = 0; + for (int i = 0; i < outputs[0]->length(); i++) { + if (outputs[0]->IsNull(i)) { + debug_nulls_count++; + } + } + int64_t nulls_count = outputs[0]->null_count(); + if (nulls_count != debug_nulls_count) { + std::cout << "Unexpected Sort Evaluate state, nulls_count: " << nulls_count + << ", debug_nulls_count: " << debug_nulls_count << std::endl; + std::flush(std::cout); + raise(SIGABRT); + } +#endif cache_size_total_ += GetArrayVectorSize(outputs); } else { nulls_total_ += in[key_id_]->null_count(); +#ifdef DEBUG + int debug_nulls_count = 0; + for (int i = 0; i < in[key_id_]->length(); i++) { + if (in[key_id_]->IsNull(i)) { + debug_nulls_count++; + } + } + int64_t nulls_count = in[key_id_]->null_count(); + if (nulls_count != debug_nulls_count) { + std::cout << "Unexpected Sort Evaluate state, nulls_count: " << nulls_count + << ", debug_nulls_count: " << debug_nulls_count << std::endl; + std::flush(std::cout); + raise(SIGABRT); + } +#endif cached_key_.push_back(std::make_shared(in[key_id_])); } @@ -1450,6 +1480,17 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { } } } +#ifdef DEBUG + if (indices_null != nulls_total_) { + std::cout << "Unexpected Sort PartitionNulls state, indices_null: " << indices_null + << ", nulls_total_: " << nulls_total_ << std::endl; + } + if (indices_i + indices_null != items_total_) { + std::cout << "Unexpected Sort PartitionNulls state, indices_i: " << indices_i + << ", indices_null: " << indices_null + << ", items_total_: " << items_total_ << std::endl; + } +#endif } int64_t PartitionNaNs(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end) { diff --git a/native-sql-engine/cpp/src/codegen/common/visitor_base.h b/native-sql-engine/cpp/src/codegen/common/visitor_base.h index 0b056f1a4..0a95d555b 100644 --- a/native-sql-engine/cpp/src/codegen/common/visitor_base.h +++ b/native-sql-engine/cpp/src/codegen/common/visitor_base.h @@ -45,6 +45,12 @@ class VisitorBase : public gandiva::NodeVisitor { arrow::Status Visit(const gandiva::InExpressionNode& node) override { return arrow::Status::OK(); } + arrow::Status Visit(const gandiva::InExpressionNode& node) override { + return arrow::Status::OK(); + } + arrow::Status Visit(const gandiva::InExpressionNode& node) override { + return arrow::Status::OK(); + } arrow::Status Visit(const gandiva::InExpressionNode& node) override { return arrow::Status::OK(); } diff --git a/native-sql-engine/cpp/src/jni/jni_common.h b/native-sql-engine/cpp/src/jni/jni_common.h index 43b6efee2..6b063da56 100644 --- a/native-sql-engine/cpp/src/jni/jni_common.h +++ b/native-sql-engine/cpp/src/jni/jni_common.h @@ -180,7 +180,7 @@ arrow::Status FIXOffsetBuffer(std::shared_ptr* in_buf, int fix_ro static_cast((*in_buf)->size())); (*in_buf) = std::move(valid_copy); } - arrow::BitUtil::SetBitsTo(const_cast((*in_buf)->data()), fix_row, 1, true); + arrow::bit_util::SetBitsTo(const_cast((*in_buf)->data()), fix_row, 1, true); return arrow::Status::OK(); } @@ -473,7 +473,7 @@ Status DecompressBuffer(const arrow::Buffer& buffer, arrow::util::Codec* codec, const uint8_t* data = buffer.data(); int64_t compressed_size = buffer.size() - sizeof(int64_t); int64_t uncompressed_size = - arrow::BitUtil::FromLittleEndian(arrow::util::SafeLoadAs(data)); + arrow::bit_util::FromLittleEndian(arrow::util::SafeLoadAs(data)); ARROW_ASSIGN_OR_RAISE(auto uncompressed, AllocateBuffer(uncompressed_size, pool)); int64_t actual_decompressed; @@ -513,7 +513,7 @@ Status DecompressBuffersByType( continue; } // if the buffer has been rebuilt to uncompressed on java side, return - if (arrow::BitUtil::GetBit(buf_mask, buffer_idx + i)) { + if (arrow::bit_util::GetBit(buf_mask, buffer_idx + i)) { continue; } if (buffer->size() < 8) { @@ -570,7 +570,7 @@ arrow::Status DecompressBuffers( return arrow::Status::OK(); } // if the buffer has been rebuilt to uncompressed on java side, return - if (arrow::BitUtil::GetBit(buf_mask, i)) { + if (arrow::bit_util::GetBit(buf_mask, i)) { ARROW_ASSIGN_OR_RAISE(auto valid_copy, buffers[i]->CopySlice(0, buffers[i]->size())); buffers[i] = valid_copy; diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index b1bc66cdc..163a0b94c 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -1464,8 +1464,7 @@ Java_com_intel_oap_vectorized_ArrowCoalesceBatchesJniWrapper_nativeCoalesceBatch for (const auto& batch : batches) { arrvec.push_back(batch->column(i)); } - std::shared_ptr bigArr; - Concatenate(arrvec, pool, &bigArr); + std::shared_ptr bigArr = JniGetOrThrow(Concatenate(arrvec, pool)); // ARROW_ASSIGN_OR_RAISE(auto bigArr, Concatenate(arrvec, pool)); arrayColumns.push_back(bigArr); } diff --git a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc index 2cbcd618a..ec281581f 100644 --- a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc +++ b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc @@ -84,11 +84,11 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); } else { bool value = *(bool*)(memory_address_ + offsets[position] + fieldOffset); - arrow::BitUtil::SetBitTo(array_data, position, value); - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(array_data, position, value); + arrow::bit_util::SetBitTo(out_is_valid, position, true); } position++; } @@ -117,12 +117,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int8_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); } position++; } @@ -151,12 +151,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int16_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); } position++; } @@ -185,12 +185,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int32_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); } position++; } @@ -218,12 +218,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int64_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); } position++; } @@ -251,12 +251,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(float*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); } position++; } @@ -284,12 +284,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(double*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); } position++; } @@ -371,10 +371,10 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::Decimal128{}; } else { - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); if (precision <= 18) { int64_t low_value; memcpy(&low_value, memory_address_ + offsets[position] + fieldOffset, 8); @@ -429,12 +429,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int32_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); } position++; } @@ -462,12 +462,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::BitUtil::SetBitTo(out_is_valid, position, false); + arrow::bit_util::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int64_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::BitUtil::SetBitTo(out_is_valid, position, true); + arrow::bit_util::SetBitTo(out_is_valid, position, true); } position++; } diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index f49789d2c..a4173f02b 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -627,7 +627,7 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n std::shared_ptr value_buffer; if (column_type_id_[i]->id() == arrow::BooleanType::type_id) { ARROW_ASSIGN_OR_RAISE(value_buffer, arrow::AllocateResizableBuffer( - arrow::BitUtil::BytesForBits(new_size), + arrow::bit_util::BytesForBits(new_size), options_.memory_pool)); } else { ARROW_ASSIGN_OR_RAISE( @@ -641,7 +641,7 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n std::shared_ptr validity_buffer; ARROW_ASSIGN_OR_RAISE( validity_buffer, - arrow::AllocateResizableBuffer(arrow::BitUtil::BytesForBits(new_size), + arrow::AllocateResizableBuffer(arrow::bit_util::BytesForBits(new_size), options_.memory_pool)); new_validity_buffers.push_back(std::move(validity_buffer)); } else { @@ -1119,8 +1119,8 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& if (rb.column_data(col_idx)->GetNullCount() == 0) { for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] != nullptr) { - arrow::BitUtil::SetBitsTo(dst_addrs[pid], partition_buffer_idx_base_[pid], - partition_id_cnt_[pid], true); + arrow::bit_util::SetBitsTo(dst_addrs[pid], partition_buffer_idx_base_[pid], + partition_id_cnt_[pid], true); } } } else { @@ -1132,11 +1132,11 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& : options_.buffer_size; ARROW_ASSIGN_OR_RAISE( auto validity_buffer, - arrow::AllocateResizableBuffer(arrow::BitUtil::BytesForBits(new_size), + arrow::AllocateResizableBuffer(arrow::bit_util::BytesForBits(new_size), options_.memory_pool)); dst_addrs[pid] = const_cast(validity_buffer->data()); - arrow::BitUtil::SetBitsTo(dst_addrs[pid], 0, partition_buffer_idx_base_[pid], - true); + arrow::bit_util::SetBitsTo(dst_addrs[pid], 0, partition_buffer_idx_base_[pid], + true); partition_fixed_width_buffers_[col][pid][0] = std::move(validity_buffer); } } diff --git a/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate.cc b/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate.cc index 606e8f84f..2e1a4d81a 100644 --- a/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate.cc +++ b/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate.cc @@ -145,9 +145,17 @@ TEST(TestArrowCompute, AggregateTest) { std::shared_ptr expected_result; std::shared_ptr result_batch; - std::vector expected_result_string = { - "[221]", "[39]", "[221]", "[39]", "[4.40724]", "[1]", - "[10]", "[17.2996]", "[40]", R"(["AU"])", R"(["wH"])"}; + std::vector expected_result_string = {"[221]", + "[39]", + "[221]", + "[39]", + "[4.407239819004525]", + "[1]", + "[10]", + "[17.299580526332743]", + "[40]", + R"(["AU"])", + R"(["wH"])"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { @@ -470,11 +478,14 @@ TEST(TestArrowCompute, GroupByAggregateTest) { "[1, 2, 3, 4, 5, null, 6, 7, 8, 9, 10]", "[25, 18, 12, 64, 125, 5, 150, 63, 32, 144, 360]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", - "[16.4, 6.5, 5, 5.875, 5.48, null, 6.1, 6.61905, 3.0625, 2.63889, " - "2.06667]", - "[8.49255, 6.93137, 7.6489, 13.5708, 17.4668, 1.41421, 8.52779, 6.23633, " - "5.58903, " - "12.535, 24.3544]"}; + "[16.4, 6.5, 5, 5.875, 5.48, null, 6.1, 6.619047619047619, 3.0625, " + "2.638888888888889, " + "2.066666666666667]", + "[8.492553019565051, 6.931367805067252, 7.648904962570575, 13.570764059328763, " + "17.466813592401582, " + "1.4142135623730951, 8.527788283128372, 6.236331106499947, " + "5.589028976303414, " + "12.535039355587863, 24.354402067771613]"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { @@ -962,11 +973,14 @@ TEST(TestArrowCompute, GroupByTwoAggregateTest) { "[5, 3, 2, 4, 5, 1, 5, 3, 2, 4, 6]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", - "[16.4, 6.5, 5, 5.875, 5.48, null, 6.1, 6.61905, 3.0625, 2.63889, " - "2.06667]", - "[8.49255, 6.93137, 7.6489, 13.5708, 17.4668, 1.41421, 8.52779, 6.23633, " - "5.58903, " - "12.535, 24.3544]"}; + "[16.4, 6.5, 5, 5.875, 5.48, null, 6.1, 6.619047619047619, 3.0625, " + "2.638888888888889, " + "2.066666666666667]", + "[8.492553019565051, 6.931367805067252, 7.648904962570575, 13.570764059328763, " + "17.466813592401582, " + "1.4142135623730951, 8.527788283128372, 6.236331106499947, " + "5.589028976303414, " + "12.535039355587863, 24.354402067771613]"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { diff --git a/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate_decimal.cc b/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate_decimal.cc index 089c9729f..ac9113d5b 100644 --- a/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate_decimal.cc +++ b/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate_decimal.cc @@ -144,8 +144,8 @@ TEST(TestArrowCompute, AggregateTest) { "[785]", R"(["0.439825"])", R"([39])", - R"([8.85288])", - R"([11113.3])"}; + R"([8.852881974358972])", + R"([11113.292918619738])"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { diff --git a/native-sql-engine/cpp/src/tests/arrow_compute_test_window.cc b/native-sql-engine/cpp/src/tests/arrow_compute_test_window.cc index 4dcbcaa66..b29ad7590 100644 --- a/native-sql-engine/cpp/src/tests/arrow_compute_test_window.cc +++ b/native-sql-engine/cpp/src/tests/arrow_compute_test_window.cc @@ -64,7 +64,8 @@ TEST(TestArrowComputeWindow, DoubleTest) { ASSERT_NOT_OK(expr->finish(&out)) std::shared_ptr expected_result; - std::vector expected_output_data = {"[118.276, 37.244, 118.276]"}; + std::vector expected_output_data = { + "[118.27600000000001, 37.244, 118.27600000000001]"}; MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); diff --git a/native-sql-engine/cpp/src/tests/arrow_compute_test_wscg.cc b/native-sql-engine/cpp/src/tests/arrow_compute_test_wscg.cc index 4deb695a0..b262034c1 100644 --- a/native-sql-engine/cpp/src/tests/arrow_compute_test_wscg.cc +++ b/native-sql-engine/cpp/src/tests/arrow_compute_test_wscg.cc @@ -4299,9 +4299,14 @@ TEST(TestArrowComputeWSCG, WSCGTestAggregate) { std::shared_ptr expected_result; std::shared_ptr result_batch; - std::vector expected_result_string = { - "[221]", "[39]", "[221]", "[39]", - "[4.30973]", "[17.2996]", R"(["AU"])", R"(["wH"])"}; + std::vector expected_result_string = {"[221]", + "[39]", + "[221]", + "[39]", + "[4.3097345132743365]", + "[17.299580526332743]", + R"(["AU"])", + R"(["wH"])"}; MakeInputBatch(expected_result_string, arrow::schema(ret_types), &expected_result); if (aggr_result_iterator->HasNext()) { ASSERT_NOT_OK(aggr_result_iterator->Next(&result_batch)); @@ -4501,11 +4506,13 @@ TEST(TestArrowComputeWSCG, WSCGTestGroupbyHashAggregateTwoKeys) { "[5, 3, 2, 4, 5, 1, 5, 3, 2, 4, 6]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", - "[16.4, 6.5, 5, 5.875, 5.48, 0.4, 6.1, 6.61905, 3.0625, 2.63889, " - "2.06667]", - "[8.49255, 6.93137, 7.6489, 13.5708, 17.4668, 1.41421, 8.52779, 6.23633, " - "5.58903, " - "12.535, 24.3544]"}; + "[16.4, 6.5, 5, 5.875, 5.48, 0.4, 6.1, 6.619047619047619, 3.0625, " + "2.638888888888889, " + "2.066666666666667]", + "[8.492553019565051, 6.931367805067252, 7.648904962570575, 13.570764059328763, " + "17.466813592401582, 1.4142135623730951, 8.527788283128372, 6.236331106499947, " + "5.589028976303414, " + "12.535039355587863, 24.354402067771613]"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { @@ -4629,11 +4636,13 @@ TEST(TestArrowComputeWSCG, WSCGTestGroupbyHashAggregate) { "[1, 2, 3, 4, 5, null, 6, 7, 8, 9, 10]", "[25, 18, 12, 64, 125, 5, 150, 63, 32, 144, 360]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", - "[16.4, 6.5, 5, 5.875, 5.48, 0.4, 6.1, 6.61905, 3.0625, 2.63889, " - "2.06667]", - "[8.49255, 6.93137, 7.6489, 13.5708, 17.4668, 1.41421, 8.52779, 6.23633, " - "5.58903, " - "12.535, 24.3544]"}; + "[16.4, 6.5, 5, 5.875, 5.48, 0.4, 6.1, 6.619047619047619, 3.0625, " + "2.638888888888889, " + "2.066666666666667]", + "[8.492553019565051, 6.931367805067252, 7.648904962570575, 13.570764059328763, " + "17.466813592401582, 1.4142135623730951, 8.527788283128372, 6.236331106499947, " + "5.589028976303414, " + "12.535039355587863, 24.354402067771613]"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { diff --git a/native-sql-engine/cpp/src/third_party/arrow/utils/hashing.h b/native-sql-engine/cpp/src/third_party/arrow/utils/hashing.h index 28c273fea..328d7e7ca 100644 --- a/native-sql-engine/cpp/src/third_party/arrow/utils/hashing.h +++ b/native-sql-engine/cpp/src/third_party/arrow/utils/hashing.h @@ -39,6 +39,7 @@ #include "arrow/type_traits.h" #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_builders.h" +#include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/ubsan.h" @@ -96,7 +97,7 @@ struct ScalarHelper::value> // then byte-swapping (which is a single CPU instruction) allows the // combined high and low bits to participate in the initial hash table index. auto h = static_cast(value); - return BitUtil::ByteSwap(multipliers[AlgNum] * h); + return bit_util::ByteSwap(multipliers[AlgNum] * h); } }; @@ -211,7 +212,7 @@ class HashTable { DCHECK_NE(pool, nullptr); // Minimum of 32 elements capacity = std::max(capacity, 32UL); - capacity_ = BitUtil::NextPower2(capacity); + capacity_ = bit_util::NextPower2(capacity); capacity_mask_ = capacity_ - 1; size_ = 0; @@ -328,8 +329,7 @@ class HashTable { // Stash old entries and seal builder, effectively resetting the Buffer const Entry* old_entries = entries_; - std::shared_ptr previous; - RETURN_NOT_OK(entries_builder_.Finish(&previous)); + ARROW_ASSIGN_OR_RAISE(auto previous, entries_builder_.FinishWithLength(capacity_)); // Allocate new buffer RETURN_NOT_OK(UpsizeBuffer(new_capacity)); @@ -460,6 +460,13 @@ class ScalarMemoTable : public MemoTable { out_data[index] = entry->payload.value; } }); + // Zero-initialize the null entry + if (null_index_ != kKeyNotFound) { + int32_t index = null_index_ - start; + if (index >= 0) { + out_data[index] = Scalar{}; + } + } } void CopyValues(Scalar* out_data) const { CopyValues(0, out_data); } @@ -774,6 +781,8 @@ class BinaryMemoTable : public MemoTable { if (left_size > 0) { memcpy(out_data, in_data + left_offset, left_size); } + // Zero-initialize the null entry + memset(out_data + left_size, 0, width_size); auto right_size = values_size() - static_cast(null_data_offset); if (right_size > 0) { diff --git a/pom.xml b/pom.xml index fe7e0a2f7..7abc25b8a 100644 --- a/pom.xml +++ b/pom.xml @@ -132,7 +132,7 @@ ${java.version} 2.10.0 2.12 - 4.0.0 + 7.0.0-gazelle 2.17.1 arrow-memory-unsafe ${hadoop.version} From d7c1ac49d39a2f7f5b156ae5526644f5892824a9 Mon Sep 17 00:00:00 2001 From: weiting-chen Date: Fri, 29 Apr 2022 22:03:00 +0800 Subject: [PATCH 2/3] fix get_physical_plan issue --- tools/sparklog.ipynb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/sparklog.ipynb b/tools/sparklog.ipynb index 753710aca..4e5d9059e 100644 --- a/tools/sparklog.ipynb +++ b/tools/sparklog.ipynb @@ -692,9 +692,9 @@ " for c in keys:\n", " v=columns[c]\n", " if v.startswith(\"array\") or v.startswith(\"map\") or v.startswith(\"struct\"):\n", - " s[1]=re.sub(c, ''+html.escape(v)+\"\",s[1])\n", + " s[1]=re.sub(re.escape(c), ''+html.escape(v)+\"\",s[1])\n", " else:\n", - " s[1]=re.sub(c, \"\"+html.escape(v)+\"\",s[1])\n", + " s[1]=re.sub(re.escape(c), \"\"+html.escape(v)+\"\",s[1])\n", "\n", "\n", " htmls=['''''']\n", @@ -743,9 +743,9 @@ " for c in keys:\n", " v=columns[c]\n", " if v.startswith(\"array\") or v.startswith(\"map\") or v.startswith(\"struct\"):\n", - " f[idx]=re.sub(c, ''+html.escape(v)+\"\",f[idx])\n", + " f[idx]=re.sub(re.escape(c), ''+html.escape(v)+\"\",f[idx])\n", " else:\n", - " f[idx]=re.sub(c, \"\"+html.escape(v)+\"\",f[idx])\n", + " f[idx]=re.sub(re.escape(c), \"\"+html.escape(v)+\"\",f[idx])\n", " funchtml=\"
\"\n", " for k,v in functions.items():\n", " if shownops is not None:\n", From 48a68caae47d0caf143331961336da23f25df2c4 Mon Sep 17 00:00:00 2001 From: weiting-chen Date: Fri, 29 Apr 2022 22:13:42 +0800 Subject: [PATCH 3/3] Revert "[NSE-728] Upgrade to Arrow 7.0.0 (#729)" This reverts commit e329253bfc7f30cfafd05596280747cb350b3cde. --- .github/workflows/tpch.yml | 2 +- .github/workflows/unittests.yml | 6 +- arrow-data-source/README.md | 2 +- .../intel/oap/spark/sql/ArrowWriteQueue.scala | 6 +- arrow-data-source/script/build_arrow.sh | 4 +- .../datasources/arrow/ArrowFileFormat.scala | 5 +- .../datasources/v2/arrow/ArrowFilters.scala | 206 ++++++++++++++++++ .../arrow/ArrowPartitionReaderFactory.scala | 3 +- .../datasources/v2/arrow/ArrowUtils.scala | 3 +- .../arrow/ArrowDataSourceTest.scala | 12 +- docs/ApacheArrowInstallation.md | 2 +- docs/Installation.md | 4 +- .../com/intel/oap/vectorized/JniUtils.java | 8 +- native-sql-engine/cpp/src/CMakeLists.txt | 8 +- .../codegen/arrow_compute/ext/actions_impl.cc | 37 ++-- .../codegen/arrow_compute/ext/array_taker.h | 22 +- .../ext/hash_aggregate_kernel.cc | 25 +-- .../codegen/arrow_compute/ext/kernels_ext.cc | 6 +- .../codegen/arrow_compute/ext/sort_kernel.cc | 51 +---- .../cpp/src/codegen/common/visitor_base.h | 6 - native-sql-engine/cpp/src/jni/jni_common.h | 8 +- native-sql-engine/cpp/src/jni/jni_wrapper.cc | 3 +- .../operators/row_to_columnar_converter.cc | 42 ++-- native-sql-engine/cpp/src/shuffle/splitter.cc | 14 +- .../src/tests/arrow_compute_test_aggregate.cc | 40 ++-- .../arrow_compute_test_aggregate_decimal.cc | 4 +- .../src/tests/arrow_compute_test_window.cc | 3 +- .../cpp/src/tests/arrow_compute_test_wscg.cc | 35 ++- .../cpp/src/third_party/arrow/utils/hashing.h | 17 +- pom.xml | 2 +- 30 files changed, 342 insertions(+), 244 deletions(-) create mode 100644 arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowFilters.scala diff --git a/.github/workflows/tpch.yml b/.github/workflows/tpch.yml index a8b7c4715..55995e960 100644 --- a/.github/workflows/tpch.yml +++ b/.github/workflows/tpch.yml @@ -51,7 +51,7 @@ jobs: run: | cd /tmp git clone https://github.com/oap-project/arrow.git - cd arrow && git checkout arrow-7.0.0-oap && cd cpp + cd arrow && git checkout arrow-4.0.0-oap && cd cpp mkdir build && cd build cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DARROW_JEMALLOC=OFF && make -j2 sudo make install diff --git a/.github/workflows/unittests.yml b/.github/workflows/unittests.yml index 7d67a6123..50263fae5 100644 --- a/.github/workflows/unittests.yml +++ b/.github/workflows/unittests.yml @@ -45,7 +45,7 @@ jobs: run: | cd /tmp git clone https://github.com/oap-project/arrow.git - cd arrow && git checkout arrow-7.0.0-oap && cd cpp + cd arrow && git checkout arrow-4.0.0-oap && cd cpp mkdir build && cd build cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2 sudo make install @@ -88,7 +88,7 @@ jobs: run: | cd /tmp git clone https://github.com/oap-project/arrow.git - cd arrow && git checkout arrow-7.0.0-oap && cd cpp + cd arrow && git checkout arrow-4.0.0-oap && cd cpp mkdir build && cd build cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2 sudo make install @@ -133,7 +133,7 @@ jobs: run: | cd /tmp git clone https://github.com/oap-project/arrow.git - cd arrow && git checkout arrow-7.0.0-oap && cd cpp + cd arrow && git checkout arrow-4.0.0-oap && cd cpp mkdir build && cd build cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2 sudo make install diff --git a/arrow-data-source/README.md b/arrow-data-source/README.md index 549f528e5..20e07cf9e 100644 --- a/arrow-data-source/README.md +++ b/arrow-data-source/README.md @@ -117,7 +117,7 @@ You have to use a customized Arrow to support for our datasets Java API. ``` // build arrow-cpp -git clone -b arrow-7.0.0-oap https://github.com/oap-project/arrow.git +git clone -b arrow-4.0.0-oap https://github.com/oap-project/arrow.git cd arrow/cpp mkdir build cd build diff --git a/arrow-data-source/common/src/main/scala/com/intel/oap/spark/sql/ArrowWriteQueue.scala b/arrow-data-source/common/src/main/scala/com/intel/oap/spark/sql/ArrowWriteQueue.scala index 84c53bbb3..d03ab27b0 100644 --- a/arrow-data-source/common/src/main/scala/com/intel/oap/spark/sql/ArrowWriteQueue.scala +++ b/arrow-data-source/common/src/main/scala/com/intel/oap/spark/sql/ArrowWriteQueue.scala @@ -27,6 +27,7 @@ import java.util.regex.Pattern import com.intel.oap.spark.sql.ArrowWriteQueue.EOS_BATCH import com.intel.oap.spark.sql.ArrowWriteQueue.ScannerImpl +import org.apache.arrow.dataset.file.DatasetFileWriter import org.apache.arrow.dataset.file.format.FileFormat import org.apache.arrow.dataset.scanner.Scanner import org.apache.arrow.dataset.scanner.ScanTask @@ -46,15 +47,12 @@ class ArrowWriteQueue(schema: Schema, fileFormat: FileFormat, outputFileURI: Str val dirURI = matcher.group(1) val fileName = matcher.group(2) -// disable write by arrow 7.0.0 -// DatasetFileWriter.write(scanner, fileFormat, dirURI, Array(), 1, fileName) + DatasetFileWriter.write(scanner, fileFormat, dirURI, Array(), 1, fileName) }, "ArrowWriteQueue - " + UUID.randomUUID().toString) writeThread.start() def enqueue(batch: ArrowRecordBatch): Unit = { - // disable write by arrow 7.0.0 - throw new UnsupportedOperationException("write is disabled by arrow 7.0.0 rebase") scanner.enqueue(batch) } diff --git a/arrow-data-source/script/build_arrow.sh b/arrow-data-source/script/build_arrow.sh index 3de988213..d8ec40128 100755 --- a/arrow-data-source/script/build_arrow.sh +++ b/arrow-data-source/script/build_arrow.sh @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}" echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}" mkdir -p $ARROW_SOURCE_DIR mkdir -p $ARROW_INSTALL_DIR -git clone https://github.com/oap-project/arrow.git --branch arrow-7.0.0-oap $ARROW_SOURCE_DIR +git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR pushd $ARROW_SOURCE_DIR cmake ./cpp \ @@ -98,7 +98,7 @@ make -j$NPROC make install cd java -mvn clean install -P arrow-jni -pl dataset,gandiva -am -Darrow.cpp.build.dir=${ARROW_INSTALL_DIR}/lib -DskipTests -Dcheckstyle.skip +mvn clean install -P arrow-jni -am -Darrow.cpp.build.dir=${ARROW_INSTALL_DIR}/lib -DskipTests -Dcheckstyle.skip popd echo "Finish to build Arrow from Source !!!" else diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala index 3dc3ebb5b..1f172b043 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import com.intel.oap.spark.sql.ArrowWriteExtension.FakeRow import com.intel.oap.spark.sql.ArrowWriteQueue -import com.intel.oap.spark.sql.execution.datasources.v2.arrow.{ArrowOptions, ArrowUtils} +import com.intel.oap.spark.sql.execution.datasources.v2.arrow.{ArrowFilters, ArrowOptions, ArrowUtils} import com.intel.oap.spark.sql.execution.datasources.v2.arrow.ArrowSQLConf._ import com.intel.oap.vectorized.ArrowWritableColumnVector import org.apache.arrow.dataset.scanner.ScanOptions @@ -129,8 +129,7 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab val dataset = factory.finish(ArrowUtils.toArrowSchema(requiredSchema)); val filter = if (enableFilterPushDown) { - // disable filter pushdown by arrow 7.0.0 - org.apache.arrow.dataset.filter.Filter.EMPTY + ArrowFilters.translateFilters(filters) } else { org.apache.arrow.dataset.filter.Filter.EMPTY } diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowFilters.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowFilters.scala new file mode 100644 index 000000000..f33c7995a --- /dev/null +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowFilters.scala @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.spark.sql.execution.datasources.v2.arrow + +import org.apache.arrow.dataset.DatasetTypes +import org.apache.arrow.dataset.DatasetTypes.TreeNode +import org.apache.arrow.dataset.filter.FilterImpl + +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.StructType + +object ArrowFilters { + def pruneWithSchema(pushedFilters: Array[Filter], schema: StructType): Seq[Filter] = { + pushedFilters.filter(pushedFilter => { + isToBeAccepted(pushedFilter, schema) + }) + } + + private def isToBeAccepted(pushedFilter: Filter, schema: StructType): Boolean = { + pushedFilter match { + case EqualTo(attribute, value) => existsIn(attribute, schema) + case GreaterThan(attribute, value) => existsIn(attribute, schema) + case GreaterThanOrEqual(attribute, value) => existsIn(attribute, schema) + case LessThan(attribute, value) => existsIn(attribute, schema) + case LessThanOrEqual(attribute, value) => existsIn(attribute, schema) + case Not(child) => isToBeAccepted(child, schema) + case And(left, right) => isToBeAccepted(left, schema) && isToBeAccepted(right, schema) + case Or(left, right) => isToBeAccepted(left, schema) && isToBeAccepted(right, schema) + case IsNotNull(attribute) => existsIn(attribute, schema) + case IsNull(attribute) => existsIn(attribute, schema) + case _ => false // fixme complete this + } + } + + private def existsIn(attr: String, schema: StructType): Boolean = { + schema.foreach(f => { + if (f.name == attr) { + return true; + } + }) + false + } + + def translateFilters(pushedFilters: Seq[Filter]): org.apache.arrow.dataset.filter.Filter = { + val node = pushedFilters + .flatMap(translateFilter) + .reduceOption((t1: TreeNode, t2: TreeNode) => { + DatasetTypes.TreeNode.newBuilder.setAndNode( + DatasetTypes.AndNode.newBuilder() + .setLeftArg(t1) + .setRightArg(t2) + .build()).build() + }) + if (node.isDefined) { + new FilterImpl(DatasetTypes.Condition.newBuilder() + .setRoot(node.get).build) + } else { + org.apache.arrow.dataset.filter.Filter.EMPTY + } + } + + private def translateValue(value: Any): Option[TreeNode] = { + value match { + case v: Integer => Some( + DatasetTypes.TreeNode.newBuilder.setIntNode( + DatasetTypes.IntNode.newBuilder.setValue(v).build) + .build) + case v: Long => Some( + DatasetTypes.TreeNode.newBuilder.setLongNode( + DatasetTypes.LongNode.newBuilder.setValue(v).build) + .build) + case v: Float => Some( + DatasetTypes.TreeNode.newBuilder.setFloatNode( + DatasetTypes.FloatNode.newBuilder.setValue(v).build) + .build) + case v: Double => Some( + DatasetTypes.TreeNode.newBuilder.setDoubleNode( + DatasetTypes.DoubleNode.newBuilder.setValue(v).build) + .build) + case v: Boolean => Some( + DatasetTypes.TreeNode.newBuilder.setBooleanNode( + DatasetTypes.BooleanNode.newBuilder.setValue(v).build) + .build) + case _ => None // fixme complete this + } + } + + private def translateFilter(pushedFilter: Filter): Option[TreeNode] = { + pushedFilter match { + case EqualTo(attribute, value) => + createComparisonNode("equal", attribute, value) + case GreaterThan(attribute, value) => + createComparisonNode("greater", attribute, value) + case GreaterThanOrEqual(attribute, value) => + createComparisonNode("greater_equal", attribute, value) + case LessThan(attribute, value) => + createComparisonNode("less", attribute, value) + case LessThanOrEqual(attribute, value) => + createComparisonNode("less_equal", attribute, value) + case Not(child) => + createNotNode(child) + case And(left, right) => + createAndNode(left, right) + case Or(left, right) => + createOrNode(left, right) + case IsNotNull(attribute) => + createIsNotNullNode(attribute) + case IsNull(attribute) => + createIsNullNode(attribute) + case _ => None // fixme complete this + } + } + + private def createComparisonNode(opName: String, + attribute: String, value: Any): Option[TreeNode] = { + val translatedValue = translateValue(value) + translatedValue match { + case Some(v) => Some( + DatasetTypes.TreeNode.newBuilder.setCpNode( + DatasetTypes.ComparisonNode.newBuilder + .setOpName(opName) // todo make op names enumerable + .setLeftArg( + DatasetTypes.TreeNode.newBuilder.setFieldNode( + DatasetTypes.FieldNode.newBuilder.setName(attribute).build) + .build) + .setRightArg(v) + .build) + .build) + case None => None + } + } + + def createNotNode(child: Filter): Option[TreeNode] = { + val translatedChild = translateFilter(child) + if (translatedChild.isEmpty) { + return None + } + Some(DatasetTypes.TreeNode.newBuilder + .setNotNode(DatasetTypes.NotNode.newBuilder.setArgs(translatedChild.get).build()).build()) + } + + def createIsNotNullNode(attribute: String): Option[TreeNode] = { + Some(DatasetTypes.TreeNode.newBuilder + .setIsValidNode( + DatasetTypes.IsValidNode.newBuilder.setArgs( + DatasetTypes.TreeNode.newBuilder.setFieldNode( + DatasetTypes.FieldNode.newBuilder.setName(attribute).build) + .build).build()).build()) + } + + def createIsNullNode(attribute: String): Option[TreeNode] = { + Some(DatasetTypes.TreeNode.newBuilder + .setNotNode( + DatasetTypes.NotNode.newBuilder.setArgs( + DatasetTypes.TreeNode.newBuilder + .setIsValidNode( + DatasetTypes.IsValidNode.newBuilder.setArgs( + DatasetTypes.TreeNode.newBuilder.setFieldNode( + DatasetTypes.FieldNode.newBuilder.setName(attribute).build) + .build) + .build()).build()).build()).build()) + } + + def createAndNode(left: Filter, right: Filter): Option[TreeNode] = { + val translatedLeft = translateFilter(left) + val translatedRight = translateFilter(right) + if (translatedLeft.isEmpty || translatedRight.isEmpty) { + return None + } + Some(DatasetTypes.TreeNode.newBuilder + .setAndNode(DatasetTypes.AndNode.newBuilder + .setLeftArg(translatedLeft.get) + .setRightArg(translatedRight.get) + .build()) + .build()) + } + + def createOrNode(left: Filter, right: Filter): Option[TreeNode] = { + val translatedLeft = translateFilter(left) + val translatedRight = translateFilter(right) + if (translatedLeft.isEmpty || translatedRight.isEmpty) { + return None + } + Some(DatasetTypes.TreeNode.newBuilder + .setOrNode(DatasetTypes.OrNode.newBuilder + .setLeftArg(translatedLeft.get) + .setRightArg(translatedRight.get) + .build()) + .build()) + } +} diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala index d627eac81..1e443a9d2 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala @@ -61,8 +61,7 @@ case class ArrowPartitionReaderFactory( partitionedFile.start, partitionedFile.length, options) val dataset = factory.finish(ArrowUtils.toArrowSchema(readDataSchema)) val filter = if (enableFilterPushDown) { - // disable filter pushdown by arrow 7.0.0 - org.apache.arrow.dataset.filter.Filter.EMPTY + ArrowFilters.translateFilters(ArrowFilters.pruneWithSchema(pushedFilters, readDataSchema)) } else { org.apache.arrow.dataset.filter.Filter.EMPTY } diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala index 5243af4d3..f6cc0303e 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala @@ -117,8 +117,7 @@ object ArrowUtils { val paramMap = options.parameters.toMap.asJava options.originalFormat match { case "parquet" => org.apache.arrow.dataset.file.format.ParquetFileFormat.create(paramMap) -// disable orc by arrow 7.0.0 -// case "orc" => org.apache.arrow.dataset.file.format.OrcFileFormat.create(paramMap) + case "orc" => org.apache.arrow.dataset.file.format.OrcFileFormat.create(paramMap) case "csv" => org.apache.arrow.dataset.file.format.CsvFileFormat.create(paramMap) case _ => throw new IllegalArgumentException("Unrecognizable format") } diff --git a/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala b/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala index 7a54f0f23..9896ac1b4 100644 --- a/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala +++ b/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala @@ -203,7 +203,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { assert(rows.length === 3) } - ignore("simple parquet write") { + test("simple parquet write") { val path = ArrowDataSourceTest.locateResourcePath(parquetFile3) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") @@ -339,7 +339,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { df.show() } - ignore("orc reader on data type: struct, array, map") { + test("orc reader on data type: struct, array, map") { val path = ArrowDataSourceTest.locateResourcePath(orcFile1) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc") @@ -363,7 +363,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { } private val orcFile = "people.orc" - ignore("read orc file") { + test("read orc file") { val path = ArrowDataSourceTest.locateResourcePath(orcFile) verifyFrame( spark.read @@ -372,7 +372,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { .load(path), 2, 3) } - ignore("read orc file - programmatic API ") { + test("read orc file - programmatic API ") { val path = ArrowDataSourceTest.locateResourcePath(orcFile) verifyFrame( spark.read @@ -380,7 +380,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { .arrow(path), 2, 3) } - ignore("create catalog table for orc") { + test("create catalog table for orc") { val path = ArrowDataSourceTest.locateResourcePath(orcFile) // spark.catalog.createTable("people", path, "arrow") spark.catalog.createTable("people", "arrow", Map("path" -> path, "originalFormat" -> "orc")) @@ -389,7 +389,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { verifyFrame(spark.sql(sql), 2, 3) } - ignore("simple SQL query on orc file ") { + test("simple SQL query on orc file ") { val path = ArrowDataSourceTest.locateResourcePath(orcFile) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc") diff --git a/docs/ApacheArrowInstallation.md b/docs/ApacheArrowInstallation.md index 1b2a584fe..c40734dda 100644 --- a/docs/ApacheArrowInstallation.md +++ b/docs/ApacheArrowInstallation.md @@ -30,7 +30,7 @@ Please make sure your cmake version is qualified based on the prerequisite. # Arrow ``` shell git clone https://github.com/oap-project/arrow.git -cd arrow && git checkout arrow-7.0.0-oap +cd arrow && git checkout arrow-4.0.0-oap mkdir -p arrow/cpp/release-build cd arrow/cpp/release-build cmake -DARROW_DEPENDENCY_SOURCE=BUNDLED -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_BOOST_USE_SHARED=ON -DARROW_JNI=ON -DARROW_DATASET=ON -DARROW_WITH_PROTOBUF=ON -DARROW_WITH_SNAPPY=ON -DARROW_WITH_LZ4=ON -DARROW_FILESYSTEM=ON -DARROW_JSON=ON .. diff --git a/docs/Installation.md b/docs/Installation.md index d632a9cd6..f27f6f56d 100644 --- a/docs/Installation.md +++ b/docs/Installation.md @@ -31,8 +31,8 @@ Based on the different environment, there are some parameters can be set via -D | arrow_root | When build_arrow set to False, arrow_root will be enabled to find the location of your existing arrow library. | /usr/local | | build_protobuf | Build Protobuf from Source. If set to False, default library path will be used to find protobuf library. | True | -When build_arrow set to True, the build_arrow.sh will be launched and compile a custom arrow library from [OAP Arrow](https://github.com/oap-project/arrow/tree/arrow-7.0.0-oap) -If you wish to change any parameters from Arrow, you can change it from the `build_arrow.sh` script under `native-sql-engine/arrow-data-source/script/`. +When build_arrow set to True, the build_arrow.sh will be launched and compile a custom arrow library from [OAP Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap) +If you wish to change any parameters from Arrow, you can change it from the `build_arrow.sh` script under `gazelle_plugin/arrow-data-source/script/`. ### Additional Notes [Notes for Installation Issues](./InstallationNotes.md) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java index 7a55f82b5..8d5baa49c 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java @@ -39,10 +39,10 @@ /** Helper class for JNI related operations. */ public class JniUtils { private static final String LIBRARY_NAME = "spark_columnar_jni"; - private static final String ARROW_LIBRARY_NAME = "libarrow.so.700.0.0"; - private static final String ARROW_PARENT_LIBRARY_NAME = "libarrow.so.700"; - private static final String GANDIVA_LIBRARY_NAME = "libgandiva.so.700.0.0"; - private static final String GANDIVA_PARENT_LIBRARY_NAME = "libgandiva.so.700"; + private static final String ARROW_LIBRARY_NAME = "libarrow.so.400.0.0"; + private static final String ARROW_PARENT_LIBRARY_NAME = "libarrow.so.400"; + private static final String GANDIVA_LIBRARY_NAME = "libgandiva.so.400.0.0"; + private static final String GANDIVA_PARENT_LIBRARY_NAME = "libgandiva.so.400"; private static boolean isLoaded = false; private static boolean isCodegenDependencyLoaded = false; private static List codegenJarsLoadedCache = new ArrayList<>(); diff --git a/native-sql-engine/cpp/src/CMakeLists.txt b/native-sql-engine/cpp/src/CMakeLists.txt index 7241aa47a..b579bc879 100644 --- a/native-sql-engine/cpp/src/CMakeLists.txt +++ b/native-sql-engine/cpp/src/CMakeLists.txt @@ -17,7 +17,7 @@ set(ARROW_ROOT "/usr/local" CACHE PATH "Arrow Root dir") set(ARROW_BFS_INSTALL_DIR "/usr/local" CACHE PATH "Arrow Build from Source dir") set(ARROW_LIB_NAME arrow) set(GANDIVA_LIB_NAME gandiva) -set(ARROW_SHARED_LIBRARY_SUFFIX ".so.700") +set(ARROW_SHARED_LIBRARY_SUFFIX ".so.400") option(BUILD_ARROW "Build Arrow from Source" ON) option(STATIC_ARROW "Build Arrow with Static Libraries" OFF) @@ -140,7 +140,7 @@ macro(build_arrow STATIC_ARROW) ExternalProject_Add(arrow_ep GIT_REPOSITORY https://github.com/oap-project/arrow.git SOURCE_DIR ${ARROW_SOURCE_DIR} - GIT_TAG arrow-7.0.0-oap + GIT_TAG arrow-4.0.0-oap BUILD_IN_SOURCE 1 INSTALL_DIR ${ARROW_PREFIX} INSTALL_COMMAND make install @@ -222,14 +222,14 @@ macro(build_arrow STATIC_ARROW) ExternalProject_Add_Step(arrow_ep copy_arrow_binary_400_0_0 COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${ARROW_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX} ${root_directory}/releases/ - COMMENT "Copy libarrow.so.700.0.0 to releases/" + COMMENT "Copy libarrow.so.400.0.0 to releases/" DEPENDEES mkdir download update patch configure build install java_install WORKING_DIRECTORY "${ARROW_PREFIX}/" ) ExternalProject_Add_Step(arrow_ep copy_gandiva_binary_400_0_0 COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${GANDIVA_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX} ${root_directory}/releases/ - COMMENT "Copy libgandiva.so.700.0.0 to releases/" + COMMENT "Copy libgandiva.so.400.0.0 to releases/" DEPENDEES mkdir download update patch configure build install java_install WORKING_DIRECTORY "${ARROW_PREFIX}/" ) diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc index 479fa5666..5a0860ac3 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc @@ -788,7 +788,7 @@ class MinAction> auto GetMinResult(const arrow::ArrayVector& in) -> typename std::enable_if_t::value> { arrow::Datum minMaxOut; - arrow::compute::ScalarAggregateOptions option; + arrow::compute::MinMaxOptions option; auto maybe_minMaxOut = arrow::compute::MinMax(*in[0].get(), option, ctx_); minMaxOut = *std::move(maybe_minMaxOut); const arrow::StructScalar& value = minMaxOut.scalar_as(); @@ -1392,7 +1392,7 @@ class MaxAction> auto GetMaxResult(const arrow::ArrayVector& in) -> typename std::enable_if_t::value> { arrow::Datum minMaxOut; - arrow::compute::ScalarAggregateOptions option; + arrow::compute::MinMaxOptions option; auto maybe_minMaxOut = arrow::compute::MinMax(*in[0].get(), option, ctx_); minMaxOut = *std::move(maybe_minMaxOut); const arrow::StructScalar& value = minMaxOut.scalar_as(); @@ -1838,8 +1838,7 @@ class SumAction(output.scalar()); cache_[0] += typed_scalar->value; @@ -2148,8 +2147,7 @@ class SumActionPartial(output.scalar()); cache_[0] += typed_scalar->value; @@ -2476,13 +2474,12 @@ class AvgAction(output.scalar()); cache_sum_[0] += typed_scalar->value; - arrow::compute::CountOptions option(arrow::compute::CountOptions::ONLY_VALID); + arrow::compute::CountOptions option(arrow::compute::CountOptions::COUNT_NON_NULL); maybe_output = arrow::compute::Count(*in[0].get(), option, ctx_); output = *std::move(maybe_output); auto count_typed_scalar = std::dynamic_pointer_cast(output.scalar()); @@ -2648,13 +2645,12 @@ class AvgAction(output.scalar()); cache_sum_[0] += typed_scalar->value; - arrow::compute::CountOptions option(arrow::compute::CountOptions::ONLY_VALID); + arrow::compute::CountOptions option(arrow::compute::CountOptions::COUNT_NON_NULL); maybe_output = arrow::compute::Count(*in[0].get(), option, ctx_); output = *std::move(maybe_output); auto count_typed_scalar = std::dynamic_pointer_cast(output.scalar()); @@ -2828,13 +2824,12 @@ class SumCountAction(output.scalar()); cache_sum_[0] += typed_scalar->value; - arrow::compute::CountOptions option(arrow::compute::CountOptions::ONLY_VALID); + arrow::compute::CountOptions option(arrow::compute::CountOptions::COUNT_NON_NULL); maybe_output = arrow::compute::Count(*in[0].get(), option, ctx_); output = *std::move(maybe_output); auto count_typed_scalar = std::dynamic_pointer_cast(output.scalar()); @@ -3184,14 +3179,12 @@ class SumCountMergeAction(output.scalar()); cache_sum_[0] += typed_scalar->value; - maybe_output = arrow::compute::Sum( - *in[1].get(), arrow::compute::ScalarAggregateOptions::Defaults(), ctx_); + maybe_output = arrow::compute::Sum(*in[1].get(), ctx_); output = *std::move(maybe_output); auto count_typed_scalar = std::dynamic_pointer_cast(output.scalar()); cache_count_[0] += count_typed_scalar->value; @@ -3536,14 +3529,12 @@ class AvgByCountAction(output.scalar()); cache_sum_[0] += typed_scalar->value; - maybe_output = arrow::compute::Sum( - *in[1].get(), arrow::compute::ScalarAggregateOptions::Defaults(), ctx_); + maybe_output = arrow::compute::Sum(*in[1].get(), ctx_); output = *std::move(maybe_output); auto count_typed_scalar = std::dynamic_pointer_cast(output.scalar()); cache_count_[0] += count_typed_scalar->value; diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_taker.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_taker.h index 6e4844ed5..4c9704c2f 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_taker.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_taker.h @@ -131,10 +131,10 @@ class ArrayTaker> : public T if (cached_arr_[array_id]->null_count() > 0 && cached_arr_[item->array_id]->IsNull(item->id)) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = CType{}; } else { - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); array_data[position] = cached_arr_[array_id]->GetView(item->id); } position++; @@ -199,7 +199,7 @@ class ArrayTaker> : public T while (position < length) { auto item = indices_begin + position; bool val = cached_arr_[item->array_id]->GetView(item->id); - arrow::bit_util::SetBitTo(array_data, position, val); + arrow::BitUtil::SetBitTo(array_data, position, val); position++; } } else { @@ -211,11 +211,11 @@ class ArrayTaker> : public T if (cached_arr_[array_id]->null_count() > 0 && cached_arr_[array_id]->IsNull(item->id)) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); } else { - arrow::bit_util::SetBitTo(array_data, position, - cached_arr_[array_id]->GetView(item->id)); - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(array_data, position, + cached_arr_[array_id]->GetView(item->id)); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); } position++; } @@ -295,10 +295,10 @@ class ArrayTaker> : public TakerBas if (cached_arr_[array_id]->null_count() > 0 && cached_arr_[array_id]->IsNull(item->id)) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = CType{}; } else { - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); array_data[position] = cached_arr_[array_id]->GetView(item->id); } position++; @@ -488,10 +488,10 @@ class ArrayTaker> : public TakerB if (cached_arr_[array_id]->null_count() > 0 && cached_arr_[array_id]->IsNull(item->id)) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = CType{}; } else { - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); array_data[position] = cached_arr_[array_id]->GetView(item->id); } position++; diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc index 9a3c6cecf..2288865e8 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc @@ -827,13 +827,7 @@ class HashAggregateKernel::Impl { if (post_process_projector_) { RETURN_NOT_OK(post_process_projector_->Evaluate(&outputs)); } - if (result_schema_->fields().empty() && !outputs.empty()) { - // treat as metadata query, for distinct count? see TPC-DS Q38 / Q87 - *out = arrow::RecordBatch::Make(result_schema_, out_length, - std::vector>()); - } else { - *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); - } + *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); return arrow::Status::OK(); } @@ -991,13 +985,7 @@ class HashAggregateKernel::Impl { if (post_process_projector_) { RETURN_NOT_OK(post_process_projector_->Evaluate(&outputs)); } - if (result_schema_->fields().empty() && !outputs.empty()) { - // treat as metadata query, for distinct count? see TPC-DS Q38 / Q87 - *out = arrow::RecordBatch::Make(result_schema_, out_length, - std::vector>()); - } else { - *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); - } + *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); return arrow::Status::OK(); } @@ -1147,13 +1135,8 @@ class HashAggregateKernel::Impl { if (post_process_projector_) { RETURN_NOT_OK(post_process_projector_->Evaluate(&outputs)); } - if (result_schema_->fields().empty() && !outputs.empty()) { - // treat as metadata query, for distinct count? see TPC-DS Q38 / Q87 - *out = arrow::RecordBatch::Make(result_schema_, out_length, - std::vector>()); - } else { - *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); - } + + *out = arrow::RecordBatch::Make(result_schema_, out_length, outputs); return arrow::Status::OK(); } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc index b7c21dd38..fd5b67de1 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -542,11 +543,12 @@ class ConcatArrayListKernel::Impl { } arrow::ArrayVector concatenated_array_list; for (auto arr_list : cached_) { + std::shared_ptr concatenated_array; arrow::ArrayVector to_be_concat_arr(arr_list.begin() + cur_batch_idx_, arr_list.begin() + end_arr_idx); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr concatenated_array, - arrow::Concatenate(to_be_concat_arr, ctx_->memory_pool())); + RETURN_NOT_OK(arrow::Concatenate(to_be_concat_arr, ctx_->memory_pool(), + &concatenated_array)); concatenated_array_list.push_back(concatenated_array); } int length = concatenated_array_list[0]->length(); diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc index 07fd657b4..8d7292e76 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc @@ -455,9 +455,9 @@ class SortArraysToIndicesKernel::Impl { auto schema = current_in_cache_batch_->schema(); std::vector> out_arrs; for (auto arr : current_in_cache_batch_->columns()) { + std::shared_ptr copied; auto sliced = arr->Slice(cache_offset_); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr copied, - arrow::Concatenate({sliced}, ctx_->memory_pool())); + RETURN_NOT_OK(arrow::Concatenate({sliced}, ctx_->memory_pool(), &copied)); out_arrs.push_back(copied); } *out = arrow::RecordBatch::Make(schema, cur_size, out_arrs); @@ -971,8 +971,8 @@ class SortInplaceKernel : public SortArraysToIndicesKernel::Impl { std::shared_ptr schema, std::shared_ptr>* out) override { if (cached_0_.empty()) return arrow::Status::OK(); - ARROW_ASSIGN_OR_RAISE(concatenated_array_, - arrow::Concatenate(cached_0_, ctx_->memory_pool())); + RETURN_NOT_OK( + arrow::Concatenate(cached_0_, ctx_->memory_pool(), &concatenated_array_)); if (nulls_total_ == 0) { // Function SortNoNull is used. CTYPE* indices_begin = concatenated_array_->data()->GetMutableValues(1); @@ -1153,7 +1153,7 @@ class SortInplaceKernel : public SortArraysToIndicesKernel::Impl { uint8_t* dst = (*out)->mutable_data(); if (bitmap.AllSet()) { - arrow::bit_util::SetBitsTo(dst, offset, length, true); + arrow::BitUtil::SetBitsTo(dst, offset, length, true); } else { arrow::internal::CopyBitmap(bitmap.data, offset, length, dst, 0); } @@ -1283,39 +1283,9 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { RETURN_NOT_OK(key_projector_->Evaluate(*in_batch, ctx_->memory_pool(), &outputs)); cached_key_.push_back(std::make_shared(outputs[0])); nulls_total_ += outputs[0]->null_count(); -#ifdef DEBUG - int debug_nulls_count = 0; - for (int i = 0; i < outputs[0]->length(); i++) { - if (outputs[0]->IsNull(i)) { - debug_nulls_count++; - } - } - int64_t nulls_count = outputs[0]->null_count(); - if (nulls_count != debug_nulls_count) { - std::cout << "Unexpected Sort Evaluate state, nulls_count: " << nulls_count - << ", debug_nulls_count: " << debug_nulls_count << std::endl; - std::flush(std::cout); - raise(SIGABRT); - } -#endif cache_size_total_ += GetArrayVectorSize(outputs); } else { nulls_total_ += in[key_id_]->null_count(); -#ifdef DEBUG - int debug_nulls_count = 0; - for (int i = 0; i < in[key_id_]->length(); i++) { - if (in[key_id_]->IsNull(i)) { - debug_nulls_count++; - } - } - int64_t nulls_count = in[key_id_]->null_count(); - if (nulls_count != debug_nulls_count) { - std::cout << "Unexpected Sort Evaluate state, nulls_count: " << nulls_count - << ", debug_nulls_count: " << debug_nulls_count << std::endl; - std::flush(std::cout); - raise(SIGABRT); - } -#endif cached_key_.push_back(std::make_shared(in[key_id_])); } @@ -1480,17 +1450,6 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { } } } -#ifdef DEBUG - if (indices_null != nulls_total_) { - std::cout << "Unexpected Sort PartitionNulls state, indices_null: " << indices_null - << ", nulls_total_: " << nulls_total_ << std::endl; - } - if (indices_i + indices_null != items_total_) { - std::cout << "Unexpected Sort PartitionNulls state, indices_i: " << indices_i - << ", indices_null: " << indices_null - << ", items_total_: " << items_total_ << std::endl; - } -#endif } int64_t PartitionNaNs(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end) { diff --git a/native-sql-engine/cpp/src/codegen/common/visitor_base.h b/native-sql-engine/cpp/src/codegen/common/visitor_base.h index 0a95d555b..0b056f1a4 100644 --- a/native-sql-engine/cpp/src/codegen/common/visitor_base.h +++ b/native-sql-engine/cpp/src/codegen/common/visitor_base.h @@ -45,12 +45,6 @@ class VisitorBase : public gandiva::NodeVisitor { arrow::Status Visit(const gandiva::InExpressionNode& node) override { return arrow::Status::OK(); } - arrow::Status Visit(const gandiva::InExpressionNode& node) override { - return arrow::Status::OK(); - } - arrow::Status Visit(const gandiva::InExpressionNode& node) override { - return arrow::Status::OK(); - } arrow::Status Visit(const gandiva::InExpressionNode& node) override { return arrow::Status::OK(); } diff --git a/native-sql-engine/cpp/src/jni/jni_common.h b/native-sql-engine/cpp/src/jni/jni_common.h index 6b063da56..43b6efee2 100644 --- a/native-sql-engine/cpp/src/jni/jni_common.h +++ b/native-sql-engine/cpp/src/jni/jni_common.h @@ -180,7 +180,7 @@ arrow::Status FIXOffsetBuffer(std::shared_ptr* in_buf, int fix_ro static_cast((*in_buf)->size())); (*in_buf) = std::move(valid_copy); } - arrow::bit_util::SetBitsTo(const_cast((*in_buf)->data()), fix_row, 1, true); + arrow::BitUtil::SetBitsTo(const_cast((*in_buf)->data()), fix_row, 1, true); return arrow::Status::OK(); } @@ -473,7 +473,7 @@ Status DecompressBuffer(const arrow::Buffer& buffer, arrow::util::Codec* codec, const uint8_t* data = buffer.data(); int64_t compressed_size = buffer.size() - sizeof(int64_t); int64_t uncompressed_size = - arrow::bit_util::FromLittleEndian(arrow::util::SafeLoadAs(data)); + arrow::BitUtil::FromLittleEndian(arrow::util::SafeLoadAs(data)); ARROW_ASSIGN_OR_RAISE(auto uncompressed, AllocateBuffer(uncompressed_size, pool)); int64_t actual_decompressed; @@ -513,7 +513,7 @@ Status DecompressBuffersByType( continue; } // if the buffer has been rebuilt to uncompressed on java side, return - if (arrow::bit_util::GetBit(buf_mask, buffer_idx + i)) { + if (arrow::BitUtil::GetBit(buf_mask, buffer_idx + i)) { continue; } if (buffer->size() < 8) { @@ -570,7 +570,7 @@ arrow::Status DecompressBuffers( return arrow::Status::OK(); } // if the buffer has been rebuilt to uncompressed on java side, return - if (arrow::bit_util::GetBit(buf_mask, i)) { + if (arrow::BitUtil::GetBit(buf_mask, i)) { ARROW_ASSIGN_OR_RAISE(auto valid_copy, buffers[i]->CopySlice(0, buffers[i]->size())); buffers[i] = valid_copy; diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index 163a0b94c..b1bc66cdc 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -1464,7 +1464,8 @@ Java_com_intel_oap_vectorized_ArrowCoalesceBatchesJniWrapper_nativeCoalesceBatch for (const auto& batch : batches) { arrvec.push_back(batch->column(i)); } - std::shared_ptr bigArr = JniGetOrThrow(Concatenate(arrvec, pool)); + std::shared_ptr bigArr; + Concatenate(arrvec, pool, &bigArr); // ARROW_ASSIGN_OR_RAISE(auto bigArr, Concatenate(arrvec, pool)); arrayColumns.push_back(bigArr); } diff --git a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc index ec281581f..2cbcd618a 100644 --- a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc +++ b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc @@ -84,11 +84,11 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); } else { bool value = *(bool*)(memory_address_ + offsets[position] + fieldOffset); - arrow::bit_util::SetBitTo(array_data, position, value); - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(array_data, position, value); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); } position++; } @@ -117,12 +117,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int8_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); } position++; } @@ -151,12 +151,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int16_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); } position++; } @@ -185,12 +185,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int32_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); } position++; } @@ -218,12 +218,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int64_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); } position++; } @@ -251,12 +251,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(float*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); } position++; } @@ -284,12 +284,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(double*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); } position++; } @@ -371,10 +371,10 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::Decimal128{}; } else { - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); if (precision <= 18) { int64_t low_value; memcpy(&low_value, memory_address_ + offsets[position] + fieldOffset, 8); @@ -429,12 +429,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int32_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); } position++; } @@ -462,12 +462,12 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); if (is_null) { null_count++; - arrow::bit_util::SetBitTo(out_is_valid, position, false); + arrow::BitUtil::SetBitTo(out_is_valid, position, false); array_data[position] = arrow::TypeTraits::CType{}; } else { auto value = *(int64_t*)(memory_address_ + offsets[position] + fieldOffset); array_data[position] = value; - arrow::bit_util::SetBitTo(out_is_valid, position, true); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); } position++; } diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index a4173f02b..f49789d2c 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -627,7 +627,7 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n std::shared_ptr value_buffer; if (column_type_id_[i]->id() == arrow::BooleanType::type_id) { ARROW_ASSIGN_OR_RAISE(value_buffer, arrow::AllocateResizableBuffer( - arrow::bit_util::BytesForBits(new_size), + arrow::BitUtil::BytesForBits(new_size), options_.memory_pool)); } else { ARROW_ASSIGN_OR_RAISE( @@ -641,7 +641,7 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n std::shared_ptr validity_buffer; ARROW_ASSIGN_OR_RAISE( validity_buffer, - arrow::AllocateResizableBuffer(arrow::bit_util::BytesForBits(new_size), + arrow::AllocateResizableBuffer(arrow::BitUtil::BytesForBits(new_size), options_.memory_pool)); new_validity_buffers.push_back(std::move(validity_buffer)); } else { @@ -1119,8 +1119,8 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& if (rb.column_data(col_idx)->GetNullCount() == 0) { for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] != nullptr) { - arrow::bit_util::SetBitsTo(dst_addrs[pid], partition_buffer_idx_base_[pid], - partition_id_cnt_[pid], true); + arrow::BitUtil::SetBitsTo(dst_addrs[pid], partition_buffer_idx_base_[pid], + partition_id_cnt_[pid], true); } } } else { @@ -1132,11 +1132,11 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& : options_.buffer_size; ARROW_ASSIGN_OR_RAISE( auto validity_buffer, - arrow::AllocateResizableBuffer(arrow::bit_util::BytesForBits(new_size), + arrow::AllocateResizableBuffer(arrow::BitUtil::BytesForBits(new_size), options_.memory_pool)); dst_addrs[pid] = const_cast(validity_buffer->data()); - arrow::bit_util::SetBitsTo(dst_addrs[pid], 0, partition_buffer_idx_base_[pid], - true); + arrow::BitUtil::SetBitsTo(dst_addrs[pid], 0, partition_buffer_idx_base_[pid], + true); partition_fixed_width_buffers_[col][pid][0] = std::move(validity_buffer); } } diff --git a/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate.cc b/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate.cc index 2e1a4d81a..606e8f84f 100644 --- a/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate.cc +++ b/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate.cc @@ -145,17 +145,9 @@ TEST(TestArrowCompute, AggregateTest) { std::shared_ptr expected_result; std::shared_ptr result_batch; - std::vector expected_result_string = {"[221]", - "[39]", - "[221]", - "[39]", - "[4.407239819004525]", - "[1]", - "[10]", - "[17.299580526332743]", - "[40]", - R"(["AU"])", - R"(["wH"])"}; + std::vector expected_result_string = { + "[221]", "[39]", "[221]", "[39]", "[4.40724]", "[1]", + "[10]", "[17.2996]", "[40]", R"(["AU"])", R"(["wH"])"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { @@ -478,14 +470,11 @@ TEST(TestArrowCompute, GroupByAggregateTest) { "[1, 2, 3, 4, 5, null, 6, 7, 8, 9, 10]", "[25, 18, 12, 64, 125, 5, 150, 63, 32, 144, 360]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", - "[16.4, 6.5, 5, 5.875, 5.48, null, 6.1, 6.619047619047619, 3.0625, " - "2.638888888888889, " - "2.066666666666667]", - "[8.492553019565051, 6.931367805067252, 7.648904962570575, 13.570764059328763, " - "17.466813592401582, " - "1.4142135623730951, 8.527788283128372, 6.236331106499947, " - "5.589028976303414, " - "12.535039355587863, 24.354402067771613]"}; + "[16.4, 6.5, 5, 5.875, 5.48, null, 6.1, 6.61905, 3.0625, 2.63889, " + "2.06667]", + "[8.49255, 6.93137, 7.6489, 13.5708, 17.4668, 1.41421, 8.52779, 6.23633, " + "5.58903, " + "12.535, 24.3544]"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { @@ -973,14 +962,11 @@ TEST(TestArrowCompute, GroupByTwoAggregateTest) { "[5, 3, 2, 4, 5, 1, 5, 3, 2, 4, 6]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", - "[16.4, 6.5, 5, 5.875, 5.48, null, 6.1, 6.619047619047619, 3.0625, " - "2.638888888888889, " - "2.066666666666667]", - "[8.492553019565051, 6.931367805067252, 7.648904962570575, 13.570764059328763, " - "17.466813592401582, " - "1.4142135623730951, 8.527788283128372, 6.236331106499947, " - "5.589028976303414, " - "12.535039355587863, 24.354402067771613]"}; + "[16.4, 6.5, 5, 5.875, 5.48, null, 6.1, 6.61905, 3.0625, 2.63889, " + "2.06667]", + "[8.49255, 6.93137, 7.6489, 13.5708, 17.4668, 1.41421, 8.52779, 6.23633, " + "5.58903, " + "12.535, 24.3544]"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { diff --git a/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate_decimal.cc b/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate_decimal.cc index ac9113d5b..089c9729f 100644 --- a/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate_decimal.cc +++ b/native-sql-engine/cpp/src/tests/arrow_compute_test_aggregate_decimal.cc @@ -144,8 +144,8 @@ TEST(TestArrowCompute, AggregateTest) { "[785]", R"(["0.439825"])", R"([39])", - R"([8.852881974358972])", - R"([11113.292918619738])"}; + R"([8.85288])", + R"([11113.3])"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { diff --git a/native-sql-engine/cpp/src/tests/arrow_compute_test_window.cc b/native-sql-engine/cpp/src/tests/arrow_compute_test_window.cc index b29ad7590..4dcbcaa66 100644 --- a/native-sql-engine/cpp/src/tests/arrow_compute_test_window.cc +++ b/native-sql-engine/cpp/src/tests/arrow_compute_test_window.cc @@ -64,8 +64,7 @@ TEST(TestArrowComputeWindow, DoubleTest) { ASSERT_NOT_OK(expr->finish(&out)) std::shared_ptr expected_result; - std::vector expected_output_data = { - "[118.27600000000001, 37.244, 118.27600000000001]"}; + std::vector expected_output_data = {"[118.276, 37.244, 118.276]"}; MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); diff --git a/native-sql-engine/cpp/src/tests/arrow_compute_test_wscg.cc b/native-sql-engine/cpp/src/tests/arrow_compute_test_wscg.cc index b262034c1..4deb695a0 100644 --- a/native-sql-engine/cpp/src/tests/arrow_compute_test_wscg.cc +++ b/native-sql-engine/cpp/src/tests/arrow_compute_test_wscg.cc @@ -4299,14 +4299,9 @@ TEST(TestArrowComputeWSCG, WSCGTestAggregate) { std::shared_ptr expected_result; std::shared_ptr result_batch; - std::vector expected_result_string = {"[221]", - "[39]", - "[221]", - "[39]", - "[4.3097345132743365]", - "[17.299580526332743]", - R"(["AU"])", - R"(["wH"])"}; + std::vector expected_result_string = { + "[221]", "[39]", "[221]", "[39]", + "[4.30973]", "[17.2996]", R"(["AU"])", R"(["wH"])"}; MakeInputBatch(expected_result_string, arrow::schema(ret_types), &expected_result); if (aggr_result_iterator->HasNext()) { ASSERT_NOT_OK(aggr_result_iterator->Next(&result_batch)); @@ -4506,13 +4501,11 @@ TEST(TestArrowComputeWSCG, WSCGTestGroupbyHashAggregateTwoKeys) { "[5, 3, 2, 4, 5, 1, 5, 3, 2, 4, 6]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", - "[16.4, 6.5, 5, 5.875, 5.48, 0.4, 6.1, 6.619047619047619, 3.0625, " - "2.638888888888889, " - "2.066666666666667]", - "[8.492553019565051, 6.931367805067252, 7.648904962570575, 13.570764059328763, " - "17.466813592401582, 1.4142135623730951, 8.527788283128372, 6.236331106499947, " - "5.589028976303414, " - "12.535039355587863, 24.354402067771613]"}; + "[16.4, 6.5, 5, 5.875, 5.48, 0.4, 6.1, 6.61905, 3.0625, 2.63889, " + "2.06667]", + "[8.49255, 6.93137, 7.6489, 13.5708, 17.4668, 1.41421, 8.52779, 6.23633, " + "5.58903, " + "12.535, 24.3544]"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { @@ -4636,13 +4629,11 @@ TEST(TestArrowComputeWSCG, WSCGTestGroupbyHashAggregate) { "[1, 2, 3, 4, 5, null, 6, 7, 8, 9, 10]", "[25, 18, 12, 64, 125, 5, 150, 63, 32, 144, 360]", "[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]", - "[16.4, 6.5, 5, 5.875, 5.48, 0.4, 6.1, 6.619047619047619, 3.0625, " - "2.638888888888889, " - "2.066666666666667]", - "[8.492553019565051, 6.931367805067252, 7.648904962570575, 13.570764059328763, " - "17.466813592401582, 1.4142135623730951, 8.527788283128372, 6.236331106499947, " - "5.589028976303414, " - "12.535039355587863, 24.354402067771613]"}; + "[16.4, 6.5, 5, 5.875, 5.48, 0.4, 6.1, 6.61905, 3.0625, 2.63889, " + "2.06667]", + "[8.49255, 6.93137, 7.6489, 13.5708, 17.4668, 1.41421, 8.52779, 6.23633, " + "5.58903, " + "12.535, 24.3544]"}; auto res_sch = arrow::schema(ret_types); MakeInputBatch(expected_result_string, res_sch, &expected_result); if (aggr_result_iterator->HasNext()) { diff --git a/native-sql-engine/cpp/src/third_party/arrow/utils/hashing.h b/native-sql-engine/cpp/src/third_party/arrow/utils/hashing.h index 328d7e7ca..28c273fea 100644 --- a/native-sql-engine/cpp/src/third_party/arrow/utils/hashing.h +++ b/native-sql-engine/cpp/src/third_party/arrow/utils/hashing.h @@ -39,7 +39,6 @@ #include "arrow/type_traits.h" #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_builders.h" -#include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/ubsan.h" @@ -97,7 +96,7 @@ struct ScalarHelper::value> // then byte-swapping (which is a single CPU instruction) allows the // combined high and low bits to participate in the initial hash table index. auto h = static_cast(value); - return bit_util::ByteSwap(multipliers[AlgNum] * h); + return BitUtil::ByteSwap(multipliers[AlgNum] * h); } }; @@ -212,7 +211,7 @@ class HashTable { DCHECK_NE(pool, nullptr); // Minimum of 32 elements capacity = std::max(capacity, 32UL); - capacity_ = bit_util::NextPower2(capacity); + capacity_ = BitUtil::NextPower2(capacity); capacity_mask_ = capacity_ - 1; size_ = 0; @@ -329,7 +328,8 @@ class HashTable { // Stash old entries and seal builder, effectively resetting the Buffer const Entry* old_entries = entries_; - ARROW_ASSIGN_OR_RAISE(auto previous, entries_builder_.FinishWithLength(capacity_)); + std::shared_ptr previous; + RETURN_NOT_OK(entries_builder_.Finish(&previous)); // Allocate new buffer RETURN_NOT_OK(UpsizeBuffer(new_capacity)); @@ -460,13 +460,6 @@ class ScalarMemoTable : public MemoTable { out_data[index] = entry->payload.value; } }); - // Zero-initialize the null entry - if (null_index_ != kKeyNotFound) { - int32_t index = null_index_ - start; - if (index >= 0) { - out_data[index] = Scalar{}; - } - } } void CopyValues(Scalar* out_data) const { CopyValues(0, out_data); } @@ -781,8 +774,6 @@ class BinaryMemoTable : public MemoTable { if (left_size > 0) { memcpy(out_data, in_data + left_offset, left_size); } - // Zero-initialize the null entry - memset(out_data + left_size, 0, width_size); auto right_size = values_size() - static_cast(null_data_offset); if (right_size > 0) { diff --git a/pom.xml b/pom.xml index 7abc25b8a..fe7e0a2f7 100644 --- a/pom.xml +++ b/pom.xml @@ -132,7 +132,7 @@ ${java.version} 2.10.0 2.12 - 7.0.0-gazelle + 4.0.0 2.17.1 arrow-memory-unsafe ${hadoop.version}