From 0a0157aecc02dc27bc9588cbaed6893ae239fa1f Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 3 Nov 2021 16:41:23 +0800 Subject: [PATCH] Add ORC Dataset JNI Support --- cpp/src/jni/dataset/jni_wrapper.cc | 16 ++++++ .../apache/arrow/dataset/file/JniWrapper.java | 1 + .../dataset/file/format/OrcFileFormat.java | 46 ++++++++++++++++++ .../dataset/file/TestFileSystemDataset.java | 37 ++++++++++++++ .../src/test/resources/data/people.orc | Bin 0 -> 507 bytes 5 files changed, 100 insertions(+) create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/file/format/OrcFileFormat.java create mode 100644 java/dataset/src/test/resources/data/people.orc diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc index baf916b29ca9f..b319f18f5ad22 100644 --- a/cpp/src/jni/dataset/jni_wrapper.cc +++ b/cpp/src/jni/dataset/jni_wrapper.cc @@ -772,6 +772,22 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_createParq JNI_METHOD_END(-1L) } +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: createOrcFileFormat + * Signature: ([Ljava/lang/String;)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_createOrcFileFormat + (JNIEnv* env, jobject, jobjectArray dict_columns) { + JNI_METHOD_START + auto format = std::make_shared(); + auto dict_column_vector = ToStringVector(env, dict_columns); + // format->reader_options.dict_columns = std::unordered_set( + // dict_column_vector.begin(), dict_column_vector.end()); + return arrow::jniutil::CreateNativeRef(format); + JNI_METHOD_END(-1L) +} + /* * Class: org_apache_arrow_dataset_file_JniWrapper * Method: createCsvFileFormat diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index d4812d637e883..da1a5142863dc 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -85,6 +85,7 @@ public native void writeFromScannerToFile(NativeSerializedRecordBatchIterator it // todo javadoc public native long createParquetFileFormat(String[] dictColumns); + public native long createOrcFileFormat(String[] dictColumns); public native long createCsvFileFormat(char delimiter); public native void releaseFileFormatInstance(long nativeInstanceId); } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/format/OrcFileFormat.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/format/OrcFileFormat.java new file mode 100644 index 0000000000000..d18bd4269822c --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/format/OrcFileFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file.format; + +import org.apache.arrow.dataset.file.JniWrapper; + +import java.util.Collections; +import java.util.Map; + +public class OrcFileFormat extends FileFormatBase { + private final String[] dictColumns; + + public OrcFileFormat(String[] dictColumns) { + super(createOrcFileFormat(dictColumns)); + this.dictColumns = dictColumns; + } + + // Typically for Spark config parsing + public static OrcFileFormat create(Map options) { + return new OrcFileFormat(parseStringArray(getOptionValue(options, "dictColumns", "")) + ); + } + + public static OrcFileFormat createDefault() { + return create(Collections.emptyMap()); + } + + private static long createOrcFileFormat(String[] dictColumns) { + return JniWrapper.get().createOrcFileFormat(dictColumns); + } +} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index fe93ab16a540b..a2ebe67b8b1d2 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -34,6 +34,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.arrow.dataset.ParquetWriteSupport; import org.apache.arrow.dataset.file.format.CsvFileFormat; +import org.apache.arrow.dataset.file.format.OrcFileFormat; import org.apache.arrow.dataset.file.format.ParquetFileFormat; import org.apache.arrow.dataset.filter.Filter; import org.apache.arrow.dataset.jni.NativeDataset; @@ -91,6 +92,42 @@ public void testParquetRead() throws Exception { AutoCloseables.close(factory); } + @Test + public void testOrcRead() throws Exception { + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), OrcFileFormat.createDefault(), "file://" + resourcePath("data/people.orc")); + ScanOptions options = new ScanOptions(new String[] {"name", "age", "job"}, Filter.EMPTY, 100); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + NativeScanner nativeScanner = dataset.newScan(options); + List scanTasks = collect(nativeScanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + ScanTask scanTask = scanTasks.get(0); + ScanTask.BatchIterator itr = scanTask.execute(); + + VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator); + VectorLoader loader = new VectorLoader(vsr); + int rowCount = 0; + while (itr.hasNext()) { + try (ArrowRecordBatch next = itr.next()) { + loader.load(next); + } + rowCount += vsr.getRowCount(); + + // check if projector is applied + Assert.assertEquals("Schema(metadata: {org.apache.spark.version=3.1.1})", + vsr.getSchema().toString()); + } + System.out.println("vsr.getSchema().toString():" + vsr.getSchema().toString()); + Assert.assertEquals(2, rowCount); + assertEquals(3, schema.getFields().size()); + assertEquals("name", schema.getFields().get(0).getName()); + assertEquals("age", schema.getFields().get(1).getName()); + assertEquals("job", schema.getFields().get(2).getName()); + AutoCloseables.close(vsr, allocator); + } + @Test public void testCsvRead() throws Exception { RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); diff --git a/java/dataset/src/test/resources/data/people.orc b/java/dataset/src/test/resources/data/people.orc new file mode 100644 index 0000000000000000000000000000000000000000..ae4b08c34ecd8ae7af73c13abfdb8cfeb62b69f9 GIT binary patch literal 507 zcmaiw-AV#M6vt=IS=VtA8wRPtNNkaiE*7P`#fv3Fq6-zf*}GU{uupec!KKS@~P9s!A9~WnS=ZnMtKjY~@`q~pPVRBC*V@%ft1T~2#*-V$tvtn+IqM?>Rv#J2? zi@u7@BlX&@*Q9+TE2nm4@3Y(%`ZG1{33Dgt*pOe0ct4Ig8?yDsvh;^p*~ zJEA!xa>d;oi0EoCo+G*%)P_i`Xb_A7TzCQ%mlL_MxAT*)WJ}h*dP+Rm`rA*kO;=xd zLlQ1cvo-5Luq&z>Ibg$Eqz|@%>71V1Y!-khQ{X#~4y2*q4AVTly0+mmPm8;7WsBua uw$P>Huow2a5IL#QFH)m#Qlo#R#_Q1;OSpLEiUq)`ooToH^ols14(e|j{#IB3 literal 0 HcmV?d00001