Skip to content

Commit

Permalink
Add ORC Dataset JNI Support
Browse files Browse the repository at this point in the history
  • Loading branch information
zhixingheyi-tian committed Nov 3, 2021
1 parent 8d7e250 commit d2a4b1c
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 0 deletions.
16 changes: 16 additions & 0 deletions cpp/src/jni/dataset/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,22 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_createParq
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: createOrcFileFormat
* Signature: ([Ljava/lang/String;)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_createOrcFileFormat
(JNIEnv* env, jobject, jobjectArray dict_columns) {
JNI_METHOD_START
auto format = std::make_shared<arrow::dataset::OrcFileFormat>();
auto dict_column_vector = ToStringVector(env, dict_columns);
// format->reader_options.dict_columns = std::unordered_set<std::string>(
// dict_column_vector.begin(), dict_column_vector.end());
return arrow::jniutil::CreateNativeRef(format);
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: createCsvFileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public native void writeFromScannerToFile(NativeSerializedRecordBatchIterator it

// todo javadoc
public native long createParquetFileFormat(String[] dictColumns);
public native long createOrcFileFormat(String[] dictColumns);
public native long createCsvFileFormat(char delimiter);
public native void releaseFileFormatInstance(long nativeInstanceId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.file.format;

import org.apache.arrow.dataset.file.JniWrapper;

import java.util.Collections;
import java.util.Map;

public class OrcFileFormat extends FileFormatBase {
private final String[] dictColumns;

public OrcFileFormat(String[] dictColumns) {
super(createOrcFileFormat(dictColumns));
this.dictColumns = dictColumns;
}

// Typically for Spark config parsing
public static OrcFileFormat create(Map<String, String> options) {
return new OrcFileFormat(parseStringArray(getOptionValue(options, "dictColumns", ""))
);
}

public static OrcFileFormat createDefault() {
return create(Collections.emptyMap());
}

private static long createOrcFileFormat(String[] dictColumns) {
return JniWrapper.get().createOrcFileFormat(dictColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.common.collect.ImmutableMap;
import org.apache.arrow.dataset.ParquetWriteSupport;
import org.apache.arrow.dataset.file.format.CsvFileFormat;
import org.apache.arrow.dataset.file.format.OrcFileFormat;
import org.apache.arrow.dataset.file.format.ParquetFileFormat;
import org.apache.arrow.dataset.filter.Filter;
import org.apache.arrow.dataset.jni.NativeDataset;
Expand Down Expand Up @@ -91,6 +92,42 @@ public void testParquetRead() throws Exception {
AutoCloseables.close(factory);
}

@Test
public void testOrcRead() throws Exception {
RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), OrcFileFormat.createDefault(), "file://" + resourcePath("data/people.orc"));
ScanOptions options = new ScanOptions(new String[] {"name", "age", "job"}, Filter.EMPTY, 100);
Schema schema = factory.inspect();
NativeDataset dataset = factory.finish(schema);
NativeScanner nativeScanner = dataset.newScan(options);
List<? extends ScanTask> scanTasks = collect(nativeScanner.scan());
Assert.assertEquals(1, scanTasks.size());
ScanTask scanTask = scanTasks.get(0);
ScanTask.BatchIterator itr = scanTask.execute();

VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator);
VectorLoader loader = new VectorLoader(vsr);
int rowCount = 0;
while (itr.hasNext()) {
try (ArrowRecordBatch next = itr.next()) {
loader.load(next);
}
rowCount += vsr.getRowCount();

// check if projector is applied
Assert.assertEquals("Schema<name: Utf8, age: Int(32, true), job: Utf8>(metadata: {org.apache.spark.version=3.1.1})",
vsr.getSchema().toString());
}
System.out.println("vsr.getSchema().toString():" + vsr.getSchema().toString());
Assert.assertEquals(2, rowCount);
assertEquals(3, schema.getFields().size());
assertEquals("name", schema.getFields().get(0).getName());
assertEquals("age", schema.getFields().get(1).getName());
assertEquals("job", schema.getFields().get(2).getName());
AutoCloseables.close(vsr, allocator);
}

@Test
public void testCsvRead() throws Exception {
RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
Expand Down
Binary file added java/dataset/src/test/resources/data/people.orc
Binary file not shown.

0 comments on commit d2a4b1c

Please sign in to comment.