Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-273] backport spark311 support to 1.1.1 branch (#334)
Browse files Browse the repository at this point in the history
* [NSE-304] Upgrade to Arrow 4.0.0 (#307)

* [NSE-304] Upgrade to Arrow 4.0.0

* use .400

* .400

* fix

* fix

* [NSE-318]check schema before reuse exchange (#328)

* check schema before reuse exchange

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* remove debug log

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* bump version to 1.1.1

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* update arrow branch to use arrow-4.0.0-oap-1.1.1

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* update GHA arrow branch

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* [NSE-329] fix out partitioning in BHJ and SHJ (#335)

* fix out partitioning for SHJ and BHJ

* refine

* [NSE-325] fix incremental compile issue with 4.5.x scala-maven-plugin (#332)

the new release of scala-maven-plugin(4.5.x) seems buggy on incremental
compile, using 4.4.0 instead

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

Co-authored-by: Hongze Zhang <hongze.zhang@intel.com>
Co-authored-by: Rui Mo <rui.mo@intel.com>
  • Loading branch information
3 people authored May 20, 2021
1 parent 2b69e5d commit 46c0c99
Show file tree
Hide file tree
Showing 25 changed files with 940 additions and 130 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tpch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap-1.1.1 && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DARROW_JEMALLOC=OFF && make -j2
sudo make install
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-3.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap-1.1.1 && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down Expand Up @@ -89,7 +89,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-3.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap-1.1.1 && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ You have to use a customized Arrow to support for our datasets Java API.

```
// build arrow-cpp
git clone -b arrow-3.0.0-oap-1.1 https://github.com/oap-project/arrow.git
git clone -b arrow-4.0.0-oap-1.1.1 https://github.com/oap-project/arrow.git
cd arrow/cpp
mkdir build
cd build
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>spark-arrow-datasource</artifactId>
<groupId>com.intel.oap</groupId>
<version>1.1.0</version>
<version>1.1.1</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.intel.oap.spark.sql.execution.datasources.v2.arrow;

import org.apache.arrow.memory.ReservationListener;
import org.apache.arrow.dataset.jni.ReservationListener;

/**
* Reserve Spark managed memory.
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>spark-arrow-datasource</artifactId>
<groupId>com.intel.oap</groupId>
<version>1.1.0</version>
<version>1.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
30 changes: 4 additions & 26 deletions arrow-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
<parent>
<groupId>com.intel.oap</groupId>
<artifactId>native-sql-engine-parent</artifactId>
<version>1.1.0</version>
<version>1.1.1</version>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>com.intel.oap</groupId>
<artifactId>spark-arrow-datasource</artifactId>
<name>OAP Project Spark Arrow Datasource</name>
<packaging>pom</packaging>
<version>1.1.0</version>
<version>1.1.1</version>
<inceptionYear>2008</inceptionYear>
<modules>
<module>common</module>
Expand Down Expand Up @@ -49,30 +49,8 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/script/build_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}"
echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}"
mkdir -p $ARROW_SOURCE_DIR
mkdir -p $ARROW_INSTALL_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-3.0.0-oap $ARROW_SOURCE_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap-1.1.1 $ARROW_SOURCE_DIR
pushd $ARROW_SOURCE_DIR

cmake ./cpp \
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/standard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>spark-arrow-datasource</artifactId>
<groupId>com.intel.oap</groupId>
<version>1.1.0</version>
<version>1.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
.asScala
.toList
val itrList = taskList
.map(task => task.scan())
.map(task => task.execute())

Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => {
itrList.foreach(_.close())
Expand All @@ -120,7 +120,7 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
val itr = itrList
.toIterator
.flatMap(itr => itr.asScala)
.map(vsr => ArrowUtils.loadVectors(vsr, file.partitionValues, partitionSchema,
.map(batch => ArrowUtils.loadBatch(batch, file.partitionValues, partitionSchema,
requiredSchema))
new UnsafeItr(itr).asInstanceOf[Iterator[InternalRow]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ case class ArrowPartitionReaderFactory(
.toList

val vsrItrList = taskList
.map(task => task.scan())
.map(task => task.execute())

val batchItr = vsrItrList
.toIterator
.flatMap(itr => itr.asScala)
.map(bundledVectors => ArrowUtils.loadVectors(bundledVectors, partitionedFile.partitionValues,
.map(batch => ArrowUtils.loadBatch(batch, partitionedFile.partitionValues,
readPartitionSchema, readDataSchema))

new PartitionReader[ColumnarBatch] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,28 @@
package com.intel.oap.spark.sql.execution.datasources.v2.arrow

import java.net.URI
import java.util.TimeZone

import scala.collection.JavaConverters._

import com.intel.oap.vectorized.ArrowWritableColumnVector
import org.apache.arrow.dataset.file.SingleFileDatasetFactory
import org.apache.arrow.dataset.scanner.ScanTask
import org.apache.arrow.vector.FieldVector
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkSchemaUtils}
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.vectorized.ColumnVector
import org.apache.spark.sql.vectorized.ColumnarBatch

object ArrowUtils {

def readSchema(file: FileStatus, options: CaseInsensitiveStringMap): Option[StructType] = {
val factory: SingleFileDatasetFactory =
val factory: FileSystemDatasetFactory =
makeArrowDiscovery(file.getPath.toString, -1L, -1L,
new ArrowOptions(options.asScala.toMap))
val schema = factory.inspect()
Expand All @@ -65,11 +64,11 @@ object ArrowUtils {
}

def makeArrowDiscovery(file: String, startOffset: Long, length: Long,
options: ArrowOptions): SingleFileDatasetFactory = {
options: ArrowOptions): FileSystemDatasetFactory = {

val format = getFormat(options).getOrElse(throw new IllegalStateException)
val allocator = SparkMemoryUtils.contextAllocator()
val factory = new SingleFileDatasetFactory(allocator,
val factory = new FileSystemDatasetFactory(allocator,
SparkMemoryUtils.contextMemoryPool(),
format,
rewriteUri(file),
Expand All @@ -80,72 +79,32 @@ object ArrowUtils {

def toArrowSchema(t: StructType): Schema = {
// fixme this might be platform dependent
SparkSchemaUtils.toArrowSchema(t, TimeZone.getDefault.getID)
SparkSchemaUtils.toArrowSchema(t, SQLConf.get.sessionLocalTimeZone)
}

def loadVectors(bundledVectors: ScanTask.ArrowBundledVectors, partitionValues: InternalRow,
def loadBatch(input: ArrowRecordBatch, partitionValues: InternalRow,
partitionSchema: StructType, dataSchema: StructType): ColumnarBatch = {
val rowCount: Int = getRowCount(bundledVectors)
val dataVectors = getDataVectors(bundledVectors, dataSchema)
val dictionaryVectors = getDictionaryVectors(bundledVectors, dataSchema)
val rowCount: Int = input.getLength

val vectors = ArrowWritableColumnVector.loadColumns(rowCount, dataVectors.asJava,
dictionaryVectors.asJava)
val vectors = try {
ArrowWritableColumnVector.loadColumns(rowCount, toArrowSchema(dataSchema), input)
} finally {
input.close()
}
val partitionColumns = ArrowWritableColumnVector.allocateColumns(rowCount, partitionSchema)
(0 until partitionColumns.length).foreach(i => {
ColumnVectorUtils.populate(partitionColumns(i), partitionValues, i)
partitionColumns(i).setValueCount(rowCount)
partitionColumns(i).setIsConstant()
})

val batch = new ColumnarBatch(vectors ++ partitionColumns, rowCount)
val batch = new ColumnarBatch(
vectors.map(_.asInstanceOf[ColumnVector]) ++
partitionColumns.map(_.asInstanceOf[ColumnVector]),
rowCount)
batch
}

private def getRowCount(bundledVectors: ScanTask.ArrowBundledVectors) = {
val valueVectors = bundledVectors.valueVectors
val rowCount = valueVectors.getRowCount
rowCount
}

private def getDataVectors(bundledVectors: ScanTask.ArrowBundledVectors,
dataSchema: StructType): List[FieldVector] = {
// TODO Deprecate following (bad performance maybe brought).
// TODO Assert vsr strictly matches dataSchema instead.
val valueVectors = bundledVectors.valueVectors
dataSchema.map(f => {
val vector = valueVectors.getVector(f.name)
if (vector == null) {
throw new IllegalStateException("Error: no vector named " + f.name + " in record bach")
}
vector
}).toList
}

private def getDictionaryVectors(bundledVectors: ScanTask.ArrowBundledVectors,
dataSchema: StructType): List[FieldVector] = {
val valueVectors = bundledVectors.valueVectors
val dictionaryVectorMap = bundledVectors.dictionaryVectors

val fieldNameToDictionaryEncoding = valueVectors.getSchema.getFields.asScala.map(f => {
f.getName -> f.getDictionary
}).toMap

val dictionaryVectorsWithNulls = dataSchema.map(f => {
val de = fieldNameToDictionaryEncoding(f.name)

Option(de) match {
case None => null
case _ =>
if (de.getIndexType.getTypeID != ArrowTypeID.Int) {
throw new IllegalArgumentException("Wrong index type: " + de.getIndexType)
}
dictionaryVectorMap.get(de.getId).getVector
}
}).toList
dictionaryVectorsWithNulls
}

private def getFormat(
options: ArrowOptions): Option[org.apache.arrow.dataset.file.FileFormat] = {
Option(options.originalFormat match {
Expand Down
2 changes: 1 addition & 1 deletion docs/ApacheArrowInstallation.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Please make sure your cmake version is qualified based on the prerequisite.
# Arrow
``` shell
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-3.0.0-oap-1.1
cd arrow && git checkout arrow-4.0.0-oap-1.1.1
mkdir -p arrow/cpp/release-build
cd arrow/cpp/release-build
cmake -DARROW_DEPENDENCY_SOURCE=BUNDLED -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_BOOST_USE_SHARED=ON -DARROW_JNI=ON -DARROW_DATASET=ON -DARROW_WITH_PROTOBUF=ON -DARROW_WITH_SNAPPY=ON -DARROW_WITH_LZ4=ON -DARROW_FILESYSTEM=ON -DARROW_JSON=ON ..
Expand Down
2 changes: 1 addition & 1 deletion docs/OAP-Developer-Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Then the dependencies below will be installed:
* [HPNL](https://github.com/Intel-bigdata/HPNL)
* [PMDK](https://github.com/pmem/pmdk)
* [OneAPI](https://software.intel.com/content/www/us/en/develop/tools/oneapi.html)
* [Arrow](https://github.com/oap-project/arrow/tree/arrow-3.0.0-oap-1.1)
* [Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap-1.1.1)
* [LLVM](https://llvm.org/)

Run the following command to learn more.
Expand Down
2 changes: 1 addition & 1 deletion docs/OAP-Installation-Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Once finished steps above, you have completed OAP dependencies installation and

Dependencies below are required by OAP and all of them are included in OAP Conda package, they will be automatically installed in your cluster when you Conda install OAP. Ensure you have activated environment which you created in the previous steps.

- [Arrow](https://github.com/oap-project/arrow/tree/arrow-3.0.0-oap-1.1)
- [Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap-1.1.1)
- [Plasma](http://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/)
- [Memkind](https://anaconda.org/intel/memkind)
- [Vmemcache](https://anaconda.org/intel/vmemcache)
Expand Down
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ theme: readthedocs
plugins:
- search
- mkdocs-versioning:
version: 1.1.0
version: 1.1.1
exclude_from_nav: ["image", "js", "css", "fonts", "img"]
22 changes: 12 additions & 10 deletions native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
<parent>
<groupId>com.intel.oap</groupId>
<artifactId>native-sql-engine-parent</artifactId>
<version>1.1.0</version>
<version>1.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>com.intel.oap</groupId>
<artifactId>spark-columnar-core</artifactId>
<version>1.1.0</version>
<version>1.1.1</version>
<packaging>jar</packaging>
<name>OAP Project Spark Columnar Plugin</name>

Expand Down Expand Up @@ -78,7 +78,7 @@
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>3.0.0</version>
<version>${arrow.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -134,11 +134,6 @@
<groupId>com.intel.oap</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.flatbuffers</groupId>
<artifactId>flatbuffers-java</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -259,8 +254,14 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-arrow-datasource-standard</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down Expand Up @@ -307,6 +308,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<configuration>
<recompileMode>${scala.recompile.mode}</recompileMode>
</configuration>
Expand Down
Loading

0 comments on commit 46c0c99

Please sign in to comment.