Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,6 @@
//! Define JNI APIs which can be called from Java/Scala.

use super::{serde, utils::SparkArrowConvert};
use arrow::array::RecordBatch;
use arrow::datatypes::DataType as ArrowDataType;
use datafusion::execution::memory_pool::MemoryPool;
use datafusion::{
execution::{disk_manager::DiskManagerBuilder, runtime_env::RuntimeEnv},
physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream},
prelude::{SessionConfig, SessionContext},
};
use futures::poll;
use jni::{
errors::Result as JNIResult,
objects::{
JByteArray, JClass, JIntArray, JLongArray, JObject, JObjectArray, JPrimitiveArray, JString,
ReleaseMode,
},
sys::{jbyteArray, jint, jlong, jlongArray},
JNIEnv,
};
use std::path::PathBuf;
use std::time::{Duration, Instant};
use std::{sync::Arc, task::Poll};

use crate::{
errors::{try_unwrap_or_throw, CometError, CometResult},
execution::{
Expand All @@ -48,19 +26,41 @@ use crate::{
},
jvm_bridge::{jni_new_global_ref, JVMClasses},
};
use arrow::array::{Array, RecordBatch, UInt32Array};
use arrow::compute::{take, TakeOptions};
use arrow::datatypes::DataType as ArrowDataType;
use datafusion::common::ScalarValue;
use datafusion::execution::disk_manager::DiskManagerMode;
use datafusion::execution::memory_pool::MemoryPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::logical_expr::ScalarUDF;
use datafusion::{
execution::{disk_manager::DiskManagerBuilder, runtime_env::RuntimeEnv},
physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream},
prelude::{SessionConfig, SessionContext},
};
use datafusion_comet_proto::spark_operator::Operator;
use datafusion_spark::function::math::expm1::SparkExpm1;
use futures::poll;
use futures::stream::StreamExt;
use jni::objects::JByteBuffer;
use jni::sys::JNI_FALSE;
use jni::{
errors::Result as JNIResult,
objects::{
JByteArray, JClass, JIntArray, JLongArray, JObject, JObjectArray, JPrimitiveArray, JString,
ReleaseMode,
},
sys::{jbyteArray, jint, jlong, jlongArray},
JNIEnv,
};
use jni::{
objects::GlobalRef,
sys::{jboolean, jdouble, jintArray, jobjectArray, jstring},
};
use std::path::PathBuf;
use std::time::{Duration, Instant};
use std::{sync::Arc, task::Poll};
use tokio::runtime::Runtime;

use crate::execution::memory_pools::{
Expand Down Expand Up @@ -341,10 +341,28 @@ fn prepare_output(
let mut i = 0;
while i < results.len() {
let array_ref = results.get(i).ok_or(CometError::IndexOutOfBounds(i))?;
array_ref
.to_data()
.move_to_spark(array_addrs[i], schema_addrs[i])?;

if array_ref.offset() != 0 {
// https://github.com/apache/datafusion-comet/issues/2051
// Bug with non-zero offset FFI, so take to a new array which will have an offset of 0.
// We expect this to be a cold code path, hence the check_bounds: true and assert_eq.
let indices = UInt32Array::from((0..num_rows as u32).collect::<Vec<u32>>());
let new_array = take(
array_ref,
&indices,
Some(TakeOptions { check_bounds: true }),
)?;

assert_eq!(new_array.offset(), 0);

new_array
.to_data()
.move_to_spark(array_addrs[i], schema_addrs[i])?;
} else {
array_ref
.to_data()
.move_to_spark(array_addrs[i], schema_addrs[i])?;
}
i += 1;
}
}
Expand Down
Loading