Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JNI: Pass names of children struct columns to native Arrow IPC writer [skip ci] #7598

Merged
merged 16 commits into from
Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion java/src/main/java/ai/rapids/cudf/ArrowIPCWriterOptions.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,11 @@

package ai.rapids.cudf;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* Settings for writing Arrow IPC data.
*/
Expand All @@ -34,11 +39,13 @@ public interface DoneOnGpu {

private final long size;
private final DoneOnGpu callback;
private final List<ColumnMetadata> columnMeta;

private ArrowIPCWriterOptions(Builder builder) {
super(builder);
this.size = builder.size;
this.callback = builder.callback;
this.columnMeta = builder.columnMeta;
}

public long getMaxChunkSize() {
Expand All @@ -49,9 +56,23 @@ public DoneOnGpu getCallback() {
return callback;
}

public List<ColumnMetadata> getColumnMetadata() {
if (columnMeta == null || columnMeta.size() == 0) {
// This is for compatibility. Try building from column names when column meta is empty.
// Should remove this once all the callers update to use only column metadata.
return Arrays
.stream(getColumnNames())
.map(ColumnMetadata::new)
.collect(Collectors.toList());
} else {
return columnMeta;
}
}

public static class Builder extends WriterBuilder<Builder> {
private long size = -1;
private DoneOnGpu callback = (ignored) -> {};
private List<ColumnMetadata> columnMeta = new ArrayList<>();

public Builder withMaxChunkSize(long size) {
this.size = size;
Expand All @@ -67,6 +88,18 @@ public Builder withCallback(DoneOnGpu callback) {
return this;
}

/**
* This should be used instead of `withColumnNames` when there are children
* columns of struct type.
*
* (`columnNullability` is not used by Arrow IPC Writer, so it's fine to be ignored here.
* It can be placed into `ColumnMetadata` if needing it in the future.)
*/
public Builder withColumnMetadata(ColumnMetadata... columnMeta) {
this.columnMeta.addAll(Arrays.asList(columnMeta));
return this;
}

public ArrowIPCWriterOptions build() {
return new ArrowIPCWriterOptions(this);
}
Expand Down
77 changes: 77 additions & 0 deletions java/src/main/java/ai/rapids/cudf/ColumnMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ai.rapids.cudf;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Detailed meta data information for arrow array.
*
* (This is analogous to the native `column_metadata`.)
*/
public class ColumnMetadata {
private String name;
private List<ColumnMetadata> children = new ArrayList<>();

public ColumnMetadata(final String colName) {
this.name = colName;
}

public ColumnMetadata addChildren(ColumnMetadata... childrenMeta) {
children.addAll(Arrays.asList(childrenMeta));
return this;
}

/**
* returns a <code>cudf::column_metadata *</code> cast to a long. We don't want to force
* users to close a ColumnMetadata. Because of the ColumnMetadata objects are created in
* pure java, but when it is time to use them this method is called to return a pointer to
* the c++ column_metadata instance. All values returned by this can be used multiple times,
* and should be closed by calling the static close method. Yes, this creates a lot more JNI
* calls, but it keeps the user API clean.
*/
long createNativeInstance() throws CudfException {
long[] childrenHandles = createNativeInstances(children);
try {
return create(name, childrenHandles);
} finally {
close(childrenHandles);
}
}

static void close(long[] metaHandles) throws CudfException {
if (metaHandles == null) {
return;
}
for (long h : metaHandles) {
close(h);
}
}

static long[] createNativeInstances(List<ColumnMetadata> metadataList) {
return metadataList.stream()
.mapToLong(ColumnMetadata::createNativeInstance)
.toArray();
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}

private static native void close(long metaHandle) throws CudfException;
private static native long create(final String name, long[] children) throws CudfException;
}
26 changes: 16 additions & 10 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,19 +365,19 @@ private static native long writeORCBufferBegin(String[] columnNames,

/**
* Setup everything to write Arrow IPC formatted data to a file.
* @param columnNames names that correspond to the table columns
* @param columnsMeta column metadata that correspond to the table columns
* @param filename local output path
* @return a handle that is used in later calls to writeArrowIPCChunk and writeArrowIPCEnd.
*/
private static native long writeArrowIPCFileBegin(String[] columnNames, String filename);
private static native long writeArrowIPCFileBegin(long[] columnsMeta, String filename);

/**
* Setup everything to write Arrow IPC formatted data to a buffer.
* @param columnNames names that correspond to the table columns
* @param columnsMeta column metadata that correspond to the table columns
* @param consumer consumer of host buffers produced.
* @return a handle that is used in later calls to writeArrowIPCChunk and writeArrowIPCEnd.
*/
private static native long writeArrowIPCBufferBegin(String[] columnNames,
private static native long writeArrowIPCBufferBegin(long[] columnsMeta,
HostBufferConsumer consumer);

/**
Expand Down Expand Up @@ -987,19 +987,25 @@ private ArrowIPCTableWriter(ArrowIPCWriterOptions options,
this.callback = options.getCallback();
this.consumer = null;
this.maxChunkSize = options.getMaxChunkSize();
this.handle = writeArrowIPCFileBegin(
options.getColumnNames(),
outputFile.getAbsolutePath());
long[] metaHandles = ColumnMetadata.createNativeInstances(options.getColumnMetadata());
try {
this.handle = writeArrowIPCFileBegin(metaHandles, outputFile.getAbsolutePath());
} finally {
ColumnMetadata.close(metaHandles);
}
}

private ArrowIPCTableWriter(ArrowIPCWriterOptions options,
HostBufferConsumer consumer) {
this.callback = options.getCallback();
this.consumer = consumer;
this.maxChunkSize = options.getMaxChunkSize();
this.handle = writeArrowIPCBufferBegin(
options.getColumnNames(),
consumer);
long[] metaHandles = ColumnMetadata.createNativeInstances(options.getColumnMetadata());
try {
this.handle = writeArrowIPCBufferBegin(metaHandles, consumer);
} finally {
ColumnMetadata.close(metaHandles);
}
}

@Override
Expand Down
1 change: 1 addition & 0 deletions java/src/main/native/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ set(SOURCE_FILES
"src/AggregationJni.cpp"
"src/CudfJni.cpp"
"src/CudaJni.cpp"
"src/ColumnMetadataJni.cpp"
"src/ColumnVectorJni.cpp"
"src/ColumnViewJni.cpp"
"src/ContiguousTableJni.cpp"
Expand Down
58 changes: 58 additions & 0 deletions java/src/main/native/src/ColumnMetadataJni.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/interop.hpp>

#include "jni_utils.hpp"

extern "C" {

JNIEXPORT void JNICALL Java_ai_rapids_cudf_ColumnMetadata_close(JNIEnv *env,
jclass,
jlong j_handle) {
JNI_NULL_CHECK(env, j_handle, "column metadata handle is null", );
try {
auto to_del = reinterpret_cast<cudf::column_metadata *>(j_handle);
delete to_del;
}
CATCH_STD(env, );
}

JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnMetadata_create(JNIEnv *env,
jclass,
jstring j_name,
jlongArray j_children) {
try {
// No need to set device since no GPU ops here.
cudf::jni::native_jstring col_name(env, j_name);
cudf::jni::native_jlongArray meta_children(env, j_children);
// Create a meta with empty name if `col_name` is NULL.
auto name = std::string(col_name.is_null() ? "" : col_name.get());
cudf::column_metadata *cm = new cudf::column_metadata(name);
jlowe marked this conversation as resolved.
Show resolved Hide resolved
if (!meta_children.is_null()) {
// add the children
for (int i = 0; i < meta_children.size(); i++) {
cudf::column_metadata *child = reinterpret_cast<cudf::column_metadata *>(meta_children[i]);
// copy to `this`.
cm->children_meta.push_back(*child);
}
}
jlowe marked this conversation as resolved.
Show resolved Hide resolved
return reinterpret_cast<jlong>(cm);
}
CATCH_STD(env, 0);
}

} // extern "C"
46 changes: 26 additions & 20 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,16 @@ typedef jni_table_writer_handle<cudf::io::orc_chunked_writer> native_orc_writer_

class native_arrow_ipc_writer_handle final {
public:
explicit native_arrow_ipc_writer_handle(const std::vector<std::string> &col_names,
explicit native_arrow_ipc_writer_handle(const std::vector<cudf::column_metadata> &col_meta,
const std::string &file_name)
: initialized(false), column_names(col_names), file_name(file_name) {}
: initialized(false), column_metadata(col_meta), file_name(file_name) {}

explicit native_arrow_ipc_writer_handle(const std::vector<std::string> &col_names,
explicit native_arrow_ipc_writer_handle(const std::vector<cudf::column_metadata> &col_meta,
const std::shared_ptr<arrow::io::OutputStream> &sink)
: initialized(false), column_names(col_names), file_name(""), sink(sink) {}
: initialized(false), column_metadata(col_meta), file_name(""), sink(sink) {}

bool initialized;
std::vector<std::string> column_names;
std::vector<cudf::column_metadata> column_metadata;
std::string file_name;
std::shared_ptr<arrow::io::OutputStream> sink;
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
Expand Down Expand Up @@ -563,6 +563,16 @@ bool valid_window_parameters(native_jintArray const &values,
values.size() == preceding.size() && values.size() == following.size();
}

static void build_column_metadata_from_handle(JNIEnv *env, jlongArray j_meta_handles,
jlowe marked this conversation as resolved.
Show resolved Hide resolved
std::vector<cudf::column_metadata>& out_meta) {
cudf::jni::native_jlongArray meta_handles(env, j_meta_handles);
for (int i = 0; i < meta_handles.size(); i++) {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
cudf::column_metadata *cm = reinterpret_cast<cudf::column_metadata *>(meta_handles[i]);
// copy to `out_meta`.
out_meta.push_back(*cm);
}
}

} // namespace

} // namespace jni
Expand Down Expand Up @@ -1196,36 +1206,37 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_Table_writeORCEnd(JNIEnv *env, jclass
}

JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeArrowIPCBufferBegin(JNIEnv *env, jclass,
jobjectArray j_col_names,
jlongArray j_col_meta,
jobject consumer) {
JNI_NULL_CHECK(env, j_col_names, "null columns", 0);
JNI_NULL_CHECK(env, j_col_meta, "null columns", 0);
JNI_NULL_CHECK(env, consumer, "null consumer", 0);
try {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstringArray col_names(env, j_col_names);

std::shared_ptr<cudf::jni::jni_arrow_output_stream> data_sink(
new cudf::jni::jni_arrow_output_stream(env, consumer));
std::vector<cudf::column_metadata> col_meta;
cudf::jni::build_column_metadata_from_handle(env, j_col_meta, col_meta);

cudf::jni::native_arrow_ipc_writer_handle *ret =
new cudf::jni::native_arrow_ipc_writer_handle(col_names.as_cpp_vector(), data_sink);
new cudf::jni::native_arrow_ipc_writer_handle(col_meta, data_sink);
return reinterpret_cast<jlong>(ret);
}
CATCH_STD(env, 0)
}

JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeArrowIPCFileBegin(JNIEnv *env, jclass,
jobjectArray j_col_names,
jlongArray j_col_meta,
jstring j_output_path) {
JNI_NULL_CHECK(env, j_col_names, "null columns", 0);
JNI_NULL_CHECK(env, j_col_meta, "null columns", 0);
JNI_NULL_CHECK(env, j_output_path, "null output path", 0);
try {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstringArray col_names(env, j_col_names);
cudf::jni::native_jstring output_path(env, j_output_path);
std::vector<cudf::column_metadata> col_meta;
cudf::jni::build_column_metadata_from_handle(env, j_col_meta, col_meta);

cudf::jni::native_arrow_ipc_writer_handle *ret =
new cudf::jni::native_arrow_ipc_writer_handle(col_names.as_cpp_vector(), output_path.get());
new cudf::jni::native_arrow_ipc_writer_handle(col_meta, output_path.get());
return reinterpret_cast<jlong>(ret);
}
CATCH_STD(env, 0)
Expand All @@ -1245,12 +1256,7 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_convertCudfToArrowTable(JNIEnv
cudf::jni::auto_set_device(env);
std::unique_ptr<std::shared_ptr<arrow::Table>> result(
new std::shared_ptr<arrow::Table>(nullptr));
auto column_metadata = std::vector<cudf::column_metadata>{};
column_metadata.reserve(state->column_names.size());
std::transform(std::begin(state->column_names), std::end(state->column_names),
std::back_inserter(column_metadata),
[](auto const &column_name) { return cudf::column_metadata{column_name}; });
*result = cudf::to_arrow(*tview, column_metadata);
*result = cudf::to_arrow(*tview, state->column_metadata);
if (!result->get()) {
return 0;
}
Expand Down
Loading