Skip to content

Commit

Permalink
Revert "[NSE-728] Upgrade to Arrow 7.0.0 (oap-project#729)"
Browse files Browse the repository at this point in the history
This reverts commit e329253.
  • Loading branch information
weiting-chen committed Apr 29, 2022
1 parent d7c1ac4 commit 48a68ca
Show file tree
Hide file tree
Showing 30 changed files with 342 additions and 244 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tpch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-7.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DARROW_JEMALLOC=OFF && make -j2
sudo make install
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-7.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down Expand Up @@ -88,7 +88,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-7.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down Expand Up @@ -133,7 +133,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-7.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ You have to use a customized Arrow to support for our datasets Java API.

```
// build arrow-cpp
git clone -b arrow-7.0.0-oap https://github.com/oap-project/arrow.git
git clone -b arrow-4.0.0-oap https://github.com/oap-project/arrow.git
cd arrow/cpp
mkdir build
cd build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.util.regex.Pattern

import com.intel.oap.spark.sql.ArrowWriteQueue.EOS_BATCH
import com.intel.oap.spark.sql.ArrowWriteQueue.ScannerImpl
import org.apache.arrow.dataset.file.DatasetFileWriter
import org.apache.arrow.dataset.file.format.FileFormat
import org.apache.arrow.dataset.scanner.Scanner
import org.apache.arrow.dataset.scanner.ScanTask
Expand All @@ -46,15 +47,12 @@ class ArrowWriteQueue(schema: Schema, fileFormat: FileFormat, outputFileURI: Str
val dirURI = matcher.group(1)
val fileName = matcher.group(2)

// disable write by arrow 7.0.0
// DatasetFileWriter.write(scanner, fileFormat, dirURI, Array(), 1, fileName)
DatasetFileWriter.write(scanner, fileFormat, dirURI, Array(), 1, fileName)
}, "ArrowWriteQueue - " + UUID.randomUUID().toString)

writeThread.start()

def enqueue(batch: ArrowRecordBatch): Unit = {
// disable write by arrow 7.0.0
throw new UnsupportedOperationException("write is disabled by arrow 7.0.0 rebase")
scanner.enqueue(batch)
}

Expand Down
4 changes: 2 additions & 2 deletions arrow-data-source/script/build_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}"
echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}"
mkdir -p $ARROW_SOURCE_DIR
mkdir -p $ARROW_INSTALL_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-7.0.0-oap $ARROW_SOURCE_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR
pushd $ARROW_SOURCE_DIR

cmake ./cpp \
Expand Down Expand Up @@ -98,7 +98,7 @@ make -j$NPROC
make install

cd java
mvn clean install -P arrow-jni -pl dataset,gandiva -am -Darrow.cpp.build.dir=${ARROW_INSTALL_DIR}/lib -DskipTests -Dcheckstyle.skip
mvn clean install -P arrow-jni -am -Darrow.cpp.build.dir=${ARROW_INSTALL_DIR}/lib -DskipTests -Dcheckstyle.skip
popd
echo "Finish to build Arrow from Source !!!"
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import com.intel.oap.spark.sql.ArrowWriteExtension.FakeRow
import com.intel.oap.spark.sql.ArrowWriteQueue
import com.intel.oap.spark.sql.execution.datasources.v2.arrow.{ArrowOptions, ArrowUtils}
import com.intel.oap.spark.sql.execution.datasources.v2.arrow.{ArrowFilters, ArrowOptions, ArrowUtils}
import com.intel.oap.spark.sql.execution.datasources.v2.arrow.ArrowSQLConf._
import com.intel.oap.vectorized.ArrowWritableColumnVector
import org.apache.arrow.dataset.scanner.ScanOptions
Expand Down Expand Up @@ -129,8 +129,7 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
val dataset = factory.finish(ArrowUtils.toArrowSchema(requiredSchema));

val filter = if (enableFilterPushDown) {
// disable filter pushdown by arrow 7.0.0
org.apache.arrow.dataset.filter.Filter.EMPTY
ArrowFilters.translateFilters(filters)
} else {
org.apache.arrow.dataset.filter.Filter.EMPTY
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.spark.sql.execution.datasources.v2.arrow

import org.apache.arrow.dataset.DatasetTypes
import org.apache.arrow.dataset.DatasetTypes.TreeNode
import org.apache.arrow.dataset.filter.FilterImpl

import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType

object ArrowFilters {
def pruneWithSchema(pushedFilters: Array[Filter], schema: StructType): Seq[Filter] = {
pushedFilters.filter(pushedFilter => {
isToBeAccepted(pushedFilter, schema)
})
}

private def isToBeAccepted(pushedFilter: Filter, schema: StructType): Boolean = {
pushedFilter match {
case EqualTo(attribute, value) => existsIn(attribute, schema)
case GreaterThan(attribute, value) => existsIn(attribute, schema)
case GreaterThanOrEqual(attribute, value) => existsIn(attribute, schema)
case LessThan(attribute, value) => existsIn(attribute, schema)
case LessThanOrEqual(attribute, value) => existsIn(attribute, schema)
case Not(child) => isToBeAccepted(child, schema)
case And(left, right) => isToBeAccepted(left, schema) && isToBeAccepted(right, schema)
case Or(left, right) => isToBeAccepted(left, schema) && isToBeAccepted(right, schema)
case IsNotNull(attribute) => existsIn(attribute, schema)
case IsNull(attribute) => existsIn(attribute, schema)
case _ => false // fixme complete this
}
}

private def existsIn(attr: String, schema: StructType): Boolean = {
schema.foreach(f => {
if (f.name == attr) {
return true;
}
})
false
}

def translateFilters(pushedFilters: Seq[Filter]): org.apache.arrow.dataset.filter.Filter = {
val node = pushedFilters
.flatMap(translateFilter)
.reduceOption((t1: TreeNode, t2: TreeNode) => {
DatasetTypes.TreeNode.newBuilder.setAndNode(
DatasetTypes.AndNode.newBuilder()
.setLeftArg(t1)
.setRightArg(t2)
.build()).build()
})
if (node.isDefined) {
new FilterImpl(DatasetTypes.Condition.newBuilder()
.setRoot(node.get).build)
} else {
org.apache.arrow.dataset.filter.Filter.EMPTY
}
}

private def translateValue(value: Any): Option[TreeNode] = {
value match {
case v: Integer => Some(
DatasetTypes.TreeNode.newBuilder.setIntNode(
DatasetTypes.IntNode.newBuilder.setValue(v).build)
.build)
case v: Long => Some(
DatasetTypes.TreeNode.newBuilder.setLongNode(
DatasetTypes.LongNode.newBuilder.setValue(v).build)
.build)
case v: Float => Some(
DatasetTypes.TreeNode.newBuilder.setFloatNode(
DatasetTypes.FloatNode.newBuilder.setValue(v).build)
.build)
case v: Double => Some(
DatasetTypes.TreeNode.newBuilder.setDoubleNode(
DatasetTypes.DoubleNode.newBuilder.setValue(v).build)
.build)
case v: Boolean => Some(
DatasetTypes.TreeNode.newBuilder.setBooleanNode(
DatasetTypes.BooleanNode.newBuilder.setValue(v).build)
.build)
case _ => None // fixme complete this
}
}

private def translateFilter(pushedFilter: Filter): Option[TreeNode] = {
pushedFilter match {
case EqualTo(attribute, value) =>
createComparisonNode("equal", attribute, value)
case GreaterThan(attribute, value) =>
createComparisonNode("greater", attribute, value)
case GreaterThanOrEqual(attribute, value) =>
createComparisonNode("greater_equal", attribute, value)
case LessThan(attribute, value) =>
createComparisonNode("less", attribute, value)
case LessThanOrEqual(attribute, value) =>
createComparisonNode("less_equal", attribute, value)
case Not(child) =>
createNotNode(child)
case And(left, right) =>
createAndNode(left, right)
case Or(left, right) =>
createOrNode(left, right)
case IsNotNull(attribute) =>
createIsNotNullNode(attribute)
case IsNull(attribute) =>
createIsNullNode(attribute)
case _ => None // fixme complete this
}
}

private def createComparisonNode(opName: String,
attribute: String, value: Any): Option[TreeNode] = {
val translatedValue = translateValue(value)
translatedValue match {
case Some(v) => Some(
DatasetTypes.TreeNode.newBuilder.setCpNode(
DatasetTypes.ComparisonNode.newBuilder
.setOpName(opName) // todo make op names enumerable
.setLeftArg(
DatasetTypes.TreeNode.newBuilder.setFieldNode(
DatasetTypes.FieldNode.newBuilder.setName(attribute).build)
.build)
.setRightArg(v)
.build)
.build)
case None => None
}
}

def createNotNode(child: Filter): Option[TreeNode] = {
val translatedChild = translateFilter(child)
if (translatedChild.isEmpty) {
return None
}
Some(DatasetTypes.TreeNode.newBuilder
.setNotNode(DatasetTypes.NotNode.newBuilder.setArgs(translatedChild.get).build()).build())
}

def createIsNotNullNode(attribute: String): Option[TreeNode] = {
Some(DatasetTypes.TreeNode.newBuilder
.setIsValidNode(
DatasetTypes.IsValidNode.newBuilder.setArgs(
DatasetTypes.TreeNode.newBuilder.setFieldNode(
DatasetTypes.FieldNode.newBuilder.setName(attribute).build)
.build).build()).build())
}

def createIsNullNode(attribute: String): Option[TreeNode] = {
Some(DatasetTypes.TreeNode.newBuilder
.setNotNode(
DatasetTypes.NotNode.newBuilder.setArgs(
DatasetTypes.TreeNode.newBuilder
.setIsValidNode(
DatasetTypes.IsValidNode.newBuilder.setArgs(
DatasetTypes.TreeNode.newBuilder.setFieldNode(
DatasetTypes.FieldNode.newBuilder.setName(attribute).build)
.build)
.build()).build()).build()).build())
}

def createAndNode(left: Filter, right: Filter): Option[TreeNode] = {
val translatedLeft = translateFilter(left)
val translatedRight = translateFilter(right)
if (translatedLeft.isEmpty || translatedRight.isEmpty) {
return None
}
Some(DatasetTypes.TreeNode.newBuilder
.setAndNode(DatasetTypes.AndNode.newBuilder
.setLeftArg(translatedLeft.get)
.setRightArg(translatedRight.get)
.build())
.build())
}

def createOrNode(left: Filter, right: Filter): Option[TreeNode] = {
val translatedLeft = translateFilter(left)
val translatedRight = translateFilter(right)
if (translatedLeft.isEmpty || translatedRight.isEmpty) {
return None
}
Some(DatasetTypes.TreeNode.newBuilder
.setOrNode(DatasetTypes.OrNode.newBuilder
.setLeftArg(translatedLeft.get)
.setRightArg(translatedRight.get)
.build())
.build())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ case class ArrowPartitionReaderFactory(
partitionedFile.start, partitionedFile.length, options)
val dataset = factory.finish(ArrowUtils.toArrowSchema(readDataSchema))
val filter = if (enableFilterPushDown) {
// disable filter pushdown by arrow 7.0.0
org.apache.arrow.dataset.filter.Filter.EMPTY
ArrowFilters.translateFilters(ArrowFilters.pruneWithSchema(pushedFilters, readDataSchema))
} else {
org.apache.arrow.dataset.filter.Filter.EMPTY
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ object ArrowUtils {
val paramMap = options.parameters.toMap.asJava
options.originalFormat match {
case "parquet" => org.apache.arrow.dataset.file.format.ParquetFileFormat.create(paramMap)
// disable orc by arrow 7.0.0
// case "orc" => org.apache.arrow.dataset.file.format.OrcFileFormat.create(paramMap)
case "orc" => org.apache.arrow.dataset.file.format.OrcFileFormat.create(paramMap)
case "csv" => org.apache.arrow.dataset.file.format.CsvFileFormat.create(paramMap)
case _ => throw new IllegalArgumentException("Unrecognizable format")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
assert(rows.length === 3)
}

ignore("simple parquet write") {
test("simple parquet write") {
val path = ArrowDataSourceTest.locateResourcePath(parquetFile3)
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet")
Expand Down Expand Up @@ -339,7 +339,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
df.show()
}

ignore("orc reader on data type: struct, array, map") {
test("orc reader on data type: struct, array, map") {
val path = ArrowDataSourceTest.locateResourcePath(orcFile1)
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc")
Expand All @@ -363,7 +363,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
}

private val orcFile = "people.orc"
ignore("read orc file") {
test("read orc file") {
val path = ArrowDataSourceTest.locateResourcePath(orcFile)
verifyFrame(
spark.read
Expand All @@ -372,15 +372,15 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
.load(path), 2, 3)
}

ignore("read orc file - programmatic API ") {
test("read orc file - programmatic API ") {
val path = ArrowDataSourceTest.locateResourcePath(orcFile)
verifyFrame(
spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc")
.arrow(path), 2, 3)
}

ignore("create catalog table for orc") {
test("create catalog table for orc") {
val path = ArrowDataSourceTest.locateResourcePath(orcFile)
// spark.catalog.createTable("people", path, "arrow")
spark.catalog.createTable("people", "arrow", Map("path" -> path, "originalFormat" -> "orc"))
Expand All @@ -389,7 +389,7 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
verifyFrame(spark.sql(sql), 2, 3)
}

ignore("simple SQL query on orc file ") {
test("simple SQL query on orc file ") {
val path = ArrowDataSourceTest.locateResourcePath(orcFile)
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc")
Expand Down
2 changes: 1 addition & 1 deletion docs/ApacheArrowInstallation.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Please make sure your cmake version is qualified based on the prerequisite.
# Arrow
``` shell
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-7.0.0-oap
cd arrow && git checkout arrow-4.0.0-oap
mkdir -p arrow/cpp/release-build
cd arrow/cpp/release-build
cmake -DARROW_DEPENDENCY_SOURCE=BUNDLED -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_BOOST_USE_SHARED=ON -DARROW_JNI=ON -DARROW_DATASET=ON -DARROW_WITH_PROTOBUF=ON -DARROW_WITH_SNAPPY=ON -DARROW_WITH_LZ4=ON -DARROW_FILESYSTEM=ON -DARROW_JSON=ON ..
Expand Down
Loading

0 comments on commit 48a68ca

Please sign in to comment.