Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-304] Upgrade to Arrow 4.0.0 (#307)
Browse files Browse the repository at this point in the history
* [NSE-304] Upgrade to Arrow 4.0.0

* use .400

* .400

* fix

* fix
  • Loading branch information
zhztheplayer authored May 14, 2021
1 parent e60389d commit 0044b8c
Show file tree
Hide file tree
Showing 17 changed files with 110 additions and 119 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-3.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down Expand Up @@ -89,7 +89,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-3.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ You have to use a customized Arrow to support for our datasets Java API.

```
// build arrow-cpp
git clone -b arrow-3.0.0-oap-1.1 https://github.com/oap-project/arrow.git
git clone -b arrow-4.0.0-oap https://github.com/oap-project/arrow.git
cd arrow/cpp
mkdir build
cd build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.intel.oap.spark.sql.execution.datasources.v2.arrow;

import org.apache.arrow.memory.ReservationListener;
import org.apache.arrow.dataset.jni.ReservationListener;

/**
* Reserve Spark managed memory.
Expand Down
26 changes: 2 additions & 24 deletions arrow-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,30 +49,8 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/script/build_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}"
echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}"
mkdir -p $ARROW_SOURCE_DIR
mkdir -p $ARROW_INSTALL_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-3.0.0-oap $ARROW_SOURCE_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR
pushd $ARROW_SOURCE_DIR

cmake ./cpp \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
.asScala
.toList
val itrList = taskList
.map(task => task.scan())
.map(task => task.execute())

Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => {
itrList.foreach(_.close())
Expand All @@ -120,7 +120,7 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
val itr = itrList
.toIterator
.flatMap(itr => itr.asScala)
.map(vsr => ArrowUtils.loadVectors(vsr, file.partitionValues, partitionSchema,
.map(batch => ArrowUtils.loadBatch(batch, file.partitionValues, partitionSchema,
requiredSchema))
new UnsafeItr(itr).asInstanceOf[Iterator[InternalRow]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ case class ArrowPartitionReaderFactory(
.toList

val vsrItrList = taskList
.map(task => task.scan())
.map(task => task.execute())

val batchItr = vsrItrList
.toIterator
.flatMap(itr => itr.asScala)
.map(bundledVectors => ArrowUtils.loadVectors(bundledVectors, partitionedFile.partitionValues,
.map(batch => ArrowUtils.loadBatch(batch, partitionedFile.partitionValues,
readPartitionSchema, readDataSchema))

new PartitionReader[ColumnarBatch] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,28 @@
package com.intel.oap.spark.sql.execution.datasources.v2.arrow

import java.net.URI
import java.util.TimeZone

import scala.collection.JavaConverters._

import com.intel.oap.vectorized.ArrowWritableColumnVector
import org.apache.arrow.dataset.file.SingleFileDatasetFactory
import org.apache.arrow.dataset.scanner.ScanTask
import org.apache.arrow.vector.FieldVector
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkSchemaUtils}
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.vectorized.ColumnVector
import org.apache.spark.sql.vectorized.ColumnarBatch

object ArrowUtils {

def readSchema(file: FileStatus, options: CaseInsensitiveStringMap): Option[StructType] = {
val factory: SingleFileDatasetFactory =
val factory: FileSystemDatasetFactory =
makeArrowDiscovery(file.getPath.toString, -1L, -1L,
new ArrowOptions(options.asScala.toMap))
val schema = factory.inspect()
Expand All @@ -65,11 +64,11 @@ object ArrowUtils {
}

def makeArrowDiscovery(file: String, startOffset: Long, length: Long,
options: ArrowOptions): SingleFileDatasetFactory = {
options: ArrowOptions): FileSystemDatasetFactory = {

val format = getFormat(options).getOrElse(throw new IllegalStateException)
val allocator = SparkMemoryUtils.contextAllocator()
val factory = new SingleFileDatasetFactory(allocator,
val factory = new FileSystemDatasetFactory(allocator,
SparkMemoryUtils.contextMemoryPool(),
format,
rewriteUri(file),
Expand All @@ -80,72 +79,32 @@ object ArrowUtils {

def toArrowSchema(t: StructType): Schema = {
// fixme this might be platform dependent
SparkSchemaUtils.toArrowSchema(t, TimeZone.getDefault.getID)
SparkSchemaUtils.toArrowSchema(t, SQLConf.get.sessionLocalTimeZone)
}

def loadVectors(bundledVectors: ScanTask.ArrowBundledVectors, partitionValues: InternalRow,
def loadBatch(input: ArrowRecordBatch, partitionValues: InternalRow,
partitionSchema: StructType, dataSchema: StructType): ColumnarBatch = {
val rowCount: Int = getRowCount(bundledVectors)
val dataVectors = getDataVectors(bundledVectors, dataSchema)
val dictionaryVectors = getDictionaryVectors(bundledVectors, dataSchema)
val rowCount: Int = input.getLength

val vectors = ArrowWritableColumnVector.loadColumns(rowCount, dataVectors.asJava,
dictionaryVectors.asJava)
val vectors = try {
ArrowWritableColumnVector.loadColumns(rowCount, toArrowSchema(dataSchema), input)
} finally {
input.close()
}
val partitionColumns = ArrowWritableColumnVector.allocateColumns(rowCount, partitionSchema)
(0 until partitionColumns.length).foreach(i => {
ColumnVectorUtils.populate(partitionColumns(i), partitionValues, i)
partitionColumns(i).setValueCount(rowCount)
partitionColumns(i).setIsConstant()
})

val batch = new ColumnarBatch(vectors ++ partitionColumns, rowCount)
val batch = new ColumnarBatch(
vectors.map(_.asInstanceOf[ColumnVector]) ++
partitionColumns.map(_.asInstanceOf[ColumnVector]),
rowCount)
batch
}

private def getRowCount(bundledVectors: ScanTask.ArrowBundledVectors) = {
val valueVectors = bundledVectors.valueVectors
val rowCount = valueVectors.getRowCount
rowCount
}

private def getDataVectors(bundledVectors: ScanTask.ArrowBundledVectors,
dataSchema: StructType): List[FieldVector] = {
// TODO Deprecate following (bad performance maybe brought).
// TODO Assert vsr strictly matches dataSchema instead.
val valueVectors = bundledVectors.valueVectors
dataSchema.map(f => {
val vector = valueVectors.getVector(f.name)
if (vector == null) {
throw new IllegalStateException("Error: no vector named " + f.name + " in record bach")
}
vector
}).toList
}

private def getDictionaryVectors(bundledVectors: ScanTask.ArrowBundledVectors,
dataSchema: StructType): List[FieldVector] = {
val valueVectors = bundledVectors.valueVectors
val dictionaryVectorMap = bundledVectors.dictionaryVectors

val fieldNameToDictionaryEncoding = valueVectors.getSchema.getFields.asScala.map(f => {
f.getName -> f.getDictionary
}).toMap

val dictionaryVectorsWithNulls = dataSchema.map(f => {
val de = fieldNameToDictionaryEncoding(f.name)

Option(de) match {
case None => null
case _ =>
if (de.getIndexType.getTypeID != ArrowTypeID.Int) {
throw new IllegalArgumentException("Wrong index type: " + de.getIndexType)
}
dictionaryVectorMap.get(de.getId).getVector
}
}).toList
dictionaryVectorsWithNulls
}

private def getFormat(
options: ArrowOptions): Option[org.apache.arrow.dataset.file.FileFormat] = {
Option(options.originalFormat match {
Expand Down
2 changes: 1 addition & 1 deletion docs/ApacheArrowInstallation.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Please make sure your cmake version is qualified based on the prerequisite.
# Arrow
``` shell
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-3.0.0-oap-1.1
cd arrow && git checkout arrow-4.0.0-oap
mkdir -p arrow/cpp/release-build
cd arrow/cpp/release-build
cmake -DARROW_DEPENDENCY_SOURCE=BUNDLED -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_BOOST_USE_SHARED=ON -DARROW_JNI=ON -DARROW_DATASET=ON -DARROW_WITH_PROTOBUF=ON -DARROW_WITH_SNAPPY=ON -DARROW_WITH_LZ4=ON -DARROW_FILESYSTEM=ON -DARROW_JSON=ON ..
Expand Down
2 changes: 1 addition & 1 deletion docs/OAP-Developer-Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Then the dependencies below will be installed:
* [HPNL](https://github.com/Intel-bigdata/HPNL)
* [PMDK](https://github.com/pmem/pmdk)
* [OneAPI](https://software.intel.com/content/www/us/en/develop/tools/oneapi.html)
* [Arrow](https://github.com/oap-project/arrow/tree/arrow-3.0.0-oap-1.1)
* [Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap)
* [LLVM](https://llvm.org/)

Run the following command to learn more.
Expand Down
2 changes: 1 addition & 1 deletion docs/OAP-Installation-Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Once finished steps above, you have completed OAP dependencies installation and

Dependencies below are required by OAP and all of them are included in OAP Conda package, they will be automatically installed in your cluster when you Conda install OAP. Ensure you have activated environment which you created in the previous steps.

- [Arrow](https://github.com/oap-project/arrow/tree/arrow-3.0.0-oap-1.1)
- [Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap)
- [Plasma](http://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/)
- [Memkind](https://anaconda.org/intel/memkind)
- [Vmemcache](https://anaconda.org/intel/vmemcache)
Expand Down
17 changes: 9 additions & 8 deletions native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>3.0.0</version>
<version>${arrow.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -134,11 +134,6 @@
<groupId>com.intel.oap</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.flatbuffers</groupId>
<artifactId>flatbuffers-java</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -259,8 +254,14 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-arrow-datasource-standard</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
/** Helper class for JNI related operations. */
public class JniUtils {
private static final String LIBRARY_NAME = "spark_columnar_jni";
private static final String ARROW_LIBRARY_NAME = "libarrow.so.300";
private static final String GANDIVA_LIBRARY_NAME = "libgandiva.so.300";
private static final String ARROW_LIBRARY_NAME = "libarrow.so.400";
private static final String GANDIVA_LIBRARY_NAME = "libgandiva.so.400";
private static boolean isLoaded = false;
private static boolean isCodegenDependencyLoaded = false;
private static List<String> codegenJarsLoadedCache = new ArrayList<>();
Expand Down
20 changes: 9 additions & 11 deletions native-sql-engine/cpp/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ macro(build_arrow STATIC_ARROW)
ExternalProject_Add(arrow_ep
GIT_REPOSITORY https://github.com/oap-project/arrow.git
SOURCE_DIR ${ARROW_SOURCE_DIR}
GIT_TAG arrow-3.0.0-oap
GIT_TAG arrow-4.0.0-oap
BUILD_IN_SOURCE 1
INSTALL_DIR ${ARROW_PREFIX}
INSTALL_COMMAND make install
Expand Down Expand Up @@ -216,15 +216,15 @@ macro(build_arrow STATIC_ARROW)
)

ExternalProject_Add_Step(arrow_ep copy_arrow_binary_300
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${ARROW_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.300 ${root_directory}/releases/
COMMENT "Copy libarrow.so.300 to releases/"
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${ARROW_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.400 ${root_directory}/releases/
COMMENT "Copy libarrow.so.400 to releases/"
DEPENDEES mkdir download update patch configure build install java_install
WORKING_DIRECTORY "${ARROW_PREFIX}/"
)

ExternalProject_Add_Step(arrow_ep copy_arrow_binary_300_0_0
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${ARROW_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.300.0.0 ${root_directory}/releases/
COMMENT "Copy libarrow.so.300.0.0 to releases/"
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${ARROW_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.400.0.0 ${root_directory}/releases/
COMMENT "Copy libarrow.so.400.0.0 to releases/"
DEPENDEES mkdir download update patch configure build install java_install
WORKING_DIRECTORY "${ARROW_PREFIX}/"
)
Expand All @@ -239,15 +239,15 @@ macro(build_arrow STATIC_ARROW)
)

ExternalProject_Add_Step(arrow_ep copy_gandiva_binary_300
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${GANDIVA_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.300 ${root_directory}/releases/
COMMENT "Copy libgandiva.so.300 to releases/"
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${GANDIVA_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.400 ${root_directory}/releases/
COMMENT "Copy libgandiva.so.400 to releases/"
DEPENDEES mkdir download update patch configure build install java_install
WORKING_DIRECTORY "${ARROW_PREFIX}/"
)

ExternalProject_Add_Step(arrow_ep copy_gandiva_binary_300_0_0
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${GANDIVA_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.300.0.0 ${root_directory}/releases/
COMMENT "Copy libgandiva.so.300.0.0 to releases/"
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${GANDIVA_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.400.0.0 ${root_directory}/releases/
COMMENT "Copy libgandiva.so.400.0.0 to releases/"
DEPENDEES mkdir download update patch configure build install java_install
WORKING_DIRECTORY "${ARROW_PREFIX}/"
)
Expand Down Expand Up @@ -321,13 +321,11 @@ macro(find_arrow)
message(STATUS "COPY and Set Arrow Header to: ${ARROW_BFS_INCLUDE_DIR}")
file(COPY ${ARROW_BFS_INCLUDE_DIR}/arrow DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_BFS_INCLUDE_DIR}/gandiva DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_BFS_INCLUDE_DIR}/jni DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_BFS_INCLUDE_DIR}/parquet DESTINATION ${root_directory}/releases/include)
else()
message(STATUS "COPY and Set Arrow Header to: ${ARROW_INCLUDE_DIR}")
file(COPY ${ARROW_INCLUDE_DIR}/arrow DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_INCLUDE_DIR}/gandiva DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_INCLUDE_DIR}/jni DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_INCLUDE_DIR}/parquet DESTINATION ${root_directory}/releases/include)
endif()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ class EncodeArrayKernel : public KernalBase {
arrow::Status Evaluate(const std::shared_ptr<arrow::Array>& in,
std::shared_ptr<arrow::Array>* out) override;

private:
class Impl;

private:
std::unique_ptr<Impl> impl_;
arrow::compute::ExecContext* ctx_ = nullptr;
};
Expand Down
Loading

0 comments on commit 0044b8c

Please sign in to comment.