Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Remove dead code #1155

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions native/core/benches/bloom_filter_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ fn criterion_benchmark(c: &mut Criterion) {
group.bench_function(agg_mode.0, |b| {
let comet_bloom_filter_agg =
Arc::new(AggregateUDF::new_from_impl(BloomFilterAgg::new(
Arc::clone(&c0),
Arc::clone(&num_items),
Arc::clone(&num_bits),
"bloom_filter_agg",
DataType::Binary,
)));
b.to_async(&rt).iter(|| {
Expand Down
5 changes: 2 additions & 3 deletions native/core/benches/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn bench(c: &mut Criterion) {
let mut group = c.benchmark_group("comet_parquet_read");
let schema = build_test_schema();

let pages = build_plain_int32_pages(schema.clone(), schema.column(0), 0.0);
let pages = build_plain_int32_pages(schema.column(0), 0.0);
group.bench_function("INT/PLAIN/NOT_NULL", |b| {
let t = TypePtr::new(
PrimitiveTypeBuilder::new("f", PhysicalType::INT32)
Expand Down Expand Up @@ -107,7 +107,6 @@ const VALUES_PER_PAGE: usize = 10_000;
const BATCH_SIZE: usize = 4096;

fn build_plain_int32_pages(
schema: SchemaDescPtr,
column_desc: ColumnDescPtr,
null_density: f32,
) -> impl PageIterator + Clone {
Expand Down Expand Up @@ -143,7 +142,7 @@ fn build_plain_int32_pages(

// Since `InMemoryPageReader` is not exposed from parquet crate, here we use
// `InMemoryPageIterator` instead which is a Iter<Iter<Page>>.
InMemoryPageIterator::new(schema, column_desc, vec![pages])
InMemoryPageIterator::new(vec![pages])
}

struct TestColumnReader {
Expand Down
44 changes: 1 addition & 43 deletions native/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,23 +485,6 @@ where
|| f(t)
}

// This is a duplicate of `try_unwrap_or_throw`, which is used to work around Arrow's lack of
// `UnwindSafe` handling.
pub fn try_assert_unwind_safe_or_throw<T, F>(env: &JNIEnv, f: F) -> T
where
T: JNIDefault,
F: FnOnce(JNIEnv) -> Result<T, CometError>,
{
let mut env1 = unsafe { JNIEnv::from_raw(env.get_raw()).unwrap() };
let env2 = unsafe { JNIEnv::from_raw(env.get_raw()).unwrap() };
unwrap_or_throw_default(
&mut env1,
flatten(
catch_unwind(std::panic::AssertUnwindSafe(curry(f, env2))).map_err(CometError::from),
),
)
}

// It is currently undefined behavior to unwind from Rust code into foreign code, so we can wrap
// our JNI functions and turn these panics into a `RuntimeException`.
pub fn try_unwrap_or_throw<T, F>(env: &JNIEnv, f: F) -> T
Expand Down Expand Up @@ -534,10 +517,7 @@ mod tests {
AttachGuard, InitArgsBuilder, JNIEnv, JNIVersion, JavaVM,
};

use assertables::{
assert_contains, assert_contains_as_result, assert_starts_with,
assert_starts_with_as_result,
};
use assertables::{assert_starts_with, assert_starts_with_as_result};

pub fn jvm() -> &'static Arc<JavaVM> {
static mut JVM: Option<Arc<JavaVM>> = None;
Expand Down Expand Up @@ -890,26 +870,4 @@ mod tests {
// first line.
assert_starts_with!(msg_rust, expected_message);
}

// Asserts that exception's message matches `expected_message`.
fn assert_exception_message_with_stacktrace(
env: &mut JNIEnv,
exception: JThrowable,
expected_message: &str,
stacktrace_contains: &str,
) {
let message = env
.call_method(exception, "getMessage", "()Ljava/lang/String;", &[])
.unwrap()
.l()
.unwrap();
let message_string = message.into();
let msg_rust: String = env.get_string(&message_string).unwrap().into();
// Since panics result in multi-line messages which include the backtrace, just use the
// first line.
assert_starts_with!(msg_rust, expected_message);

// Check that the stacktrace is included by checking for a specific element
assert_contains!(msg_rust, stacktrace_contains);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use datafusion_physical_expr::expressions::Literal;

#[derive(Debug, Clone)]
pub struct BloomFilterAgg {
name: String,
signature: Signature,
expr: Arc<dyn PhysicalExpr>,
num_items: i32,
num_bits: i32,
}
Expand All @@ -53,15 +51,12 @@ fn extract_i32_from_literal(expr: Arc<dyn PhysicalExpr>) -> i32 {

impl BloomFilterAgg {
pub fn new(
expr: Arc<dyn PhysicalExpr>,
num_items: Arc<dyn PhysicalExpr>,
num_bits: Arc<dyn PhysicalExpr>,
name: impl Into<String>,
data_type: DataType,
) -> Self {
assert!(matches!(data_type, DataType::Binary));
Self {
name: name.into(),
signature: Signature::uniform(
1,
vec![
Expand All @@ -73,7 +68,6 @@ impl BloomFilterAgg {
],
Volatility::Immutable,
),
expr,
num_items: extract_i32_from_literal(num_items),
num_bits: extract_i32_from_literal(num_bits),
}
Expand Down
3 changes: 0 additions & 3 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ use std::cmp::max;
use std::{collections::HashMap, sync::Arc};

// For clippy error on type_complexity.
type ExecResult<T> = Result<T, ExecutionError>;
type PhyAggResult = Result<Vec<AggregateFunctionExpr>, ExecutionError>;
type PhyExprResult = Result<Vec<(Arc<dyn PhysicalExpr>, String)>, ExecutionError>;
type PartitionPhyExprResult = Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError>;
Expand Down Expand Up @@ -1758,10 +1757,8 @@ impl PhysicalPlanner {
self.create_expr(expr.num_bits.as_ref().unwrap(), Arc::clone(&schema))?;
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap());
let func = AggregateUDF::new_from_impl(BloomFilterAgg::new(
Arc::clone(&child),
Arc::clone(&num_items),
Arc::clone(&num_bits),
"bloom_filter_agg",
datatype,
));
Self::create_aggr_func_expr("bloom_filter_agg", schema, vec![child], func)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl SparkBitArray {
self.data.len()
}

#[allow(dead_code)] // this is only called from tests
pub fn cardinality(&self) -> usize {
self.bit_count
}
Expand Down
8 changes: 0 additions & 8 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,6 @@ fn prepare_datafusion_session_context(
Ok(session_ctx)
}

fn parse_bool(conf: &HashMap<String, String>, name: &str) -> CometResult<bool> {
conf.get(name)
.map(String::as_str)
.unwrap_or("false")
.parse::<bool>()
.map_err(|e| CometError::Config(format!("Failed to parse boolean config {name}: {e}")))
}

/// Prepares arrow arrays for output.
fn prepare_output(
env: &mut JNIEnv,
Expand Down
117 changes: 1 addition & 116 deletions native/core/src/execution/kernels/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;

use arrow::{
array::*,
buffer::{Buffer, MutableBuffer},
buffer::MutableBuffer,
compute::kernels::substring::{substring as arrow_substring, substring_by_char},
datatypes::{DataType, Int32Type},
};
Expand Down Expand Up @@ -87,43 +87,6 @@ pub fn substring(array: &dyn Array, start: i64, length: u64) -> Result<ArrayRef,
}
}

/// Returns an ArrayRef with a substring starting from `start` and length.
///
/// # Preconditions
///
/// - `start` can be negative, in which case the start counts from the end of the string.
/// - `array` must be either [`StringArray`] or [`LargeStringArray`].
///
/// Note: this is different from arrow-rs `substring` kernel in that both `start` and `length` are
/// `Int32Array` here.
pub fn substring_with_array(
array: &dyn Array,
start: &Int32Array,
length: &Int32Array,
) -> ArrayRef {
match array.data_type() {
DataType::LargeUtf8 => generic_substring(
array
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("A large string is expected"),
start,
length,
|i| i as i64,
),
DataType::Utf8 => generic_substring(
array
.as_any()
.downcast_ref::<StringArray>()
.expect("A string is expected"),
start,
length,
|i| i,
),
_ => panic!("substring does not support type {:?}", array.data_type()),
}
}

fn generic_string_space<OffsetSize: OffsetSizeTrait>(length: &Int32Array) -> ArrayRef {
let array_len = length.len();
let mut offsets = MutableBuffer::new((array_len + 1) * std::mem::size_of::<OffsetSize>());
Expand Down Expand Up @@ -163,81 +126,3 @@ fn generic_string_space<OffsetSize: OffsetSizeTrait>(length: &Int32Array) -> Arr
};
make_array(data)
}

fn generic_substring<OffsetSize: OffsetSizeTrait, F>(
array: &GenericStringArray<OffsetSize>,
start: &Int32Array,
length: &Int32Array,
f: F,
) -> ArrayRef
where
F: Fn(i32) -> OffsetSize,
{
assert_eq!(array.len(), start.len());
assert_eq!(array.len(), length.len());

// compute current offsets
let offsets = array.to_data().buffers()[0].clone();
let offsets: &[OffsetSize] = offsets.typed_data::<OffsetSize>();

// compute null bitmap (copy)
let null_bit_buffer = array.to_data().nulls().map(|b| b.buffer().clone());

// Gets slices of start and length arrays to access them directly for performance.
let start_data = start.to_data();
let length_data = length.to_data();
let starts = start_data.buffers()[0].typed_data::<i32>();
let lengths = length_data.buffers()[0].typed_data::<i32>();

// compute values
let array_data = array.to_data();
let values = &array_data.buffers()[1];
let data = values.as_slice();

// we have no way to estimate how much this will be.
let mut new_values = MutableBuffer::new(0);
let mut new_offsets: Vec<OffsetSize> = Vec::with_capacity(array.len() + 1);

let mut length_so_far = OffsetSize::zero();
new_offsets.push(length_so_far);
(0..array.len()).for_each(|i| {
// the length of this entry
let length_i: OffsetSize = offsets[i + 1] - offsets[i];
// compute where we should start slicing this entry
let start_pos: OffsetSize = f(starts[i]);

let start = offsets[i]
+ if start_pos >= OffsetSize::zero() {
start_pos
} else {
length_i + start_pos
};

let start = start.clamp(offsets[i], offsets[i + 1]);
// compute the length of the slice
let slice_length: OffsetSize = f(lengths[i].max(0)).min(offsets[i + 1] - start);

length_so_far += slice_length;

new_offsets.push(length_so_far);

// we need usize for ranges
let start = start.to_usize().unwrap();
let slice_length = slice_length.to_usize().unwrap();

new_values.extend_from_slice(&data[start..start + slice_length]);
});

let data = unsafe {
ArrayData::new_unchecked(
GenericStringArray::<OffsetSize>::DATA_TYPE,
array.len(),
None,
null_bit_buffer,
0,
vec![Buffer::from_slice_ref(&new_offsets), new_values.into()],
vec![],
)
};
make_array(data)
}
8 changes: 0 additions & 8 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,4 @@ impl InputBatch {

InputBatch::Batch(columns, num_rows)
}

/// Get the number of rows in this batch
fn num_rows(&self) -> usize {
match self {
Self::EOF => 0,
Self::Batch(_, num_rows) => *num_rows,
}
}
}
4 changes: 1 addition & 3 deletions native/core/src/execution/shuffle/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use arrow_schema::{DataType, TimeUnit};

pub struct SparkUnsafeArray {
row_addr: i64,
row_size: i32,
num_elements: usize,
element_offset: i64,
}
Expand All @@ -45,7 +44,7 @@ impl SparkUnsafeObject for SparkUnsafeArray {

impl SparkUnsafeArray {
/// Creates a `SparkUnsafeArray` which points to the given address and size in bytes.
pub fn new(addr: i64, size: i32) -> Self {
pub fn new(addr: i64) -> Self {
// Read the number of elements from the first 8 bytes.
let slice: &[u8] = unsafe { std::slice::from_raw_parts(addr as *const u8, 8) };
let num_elements = i64::from_le_bytes(slice.try_into().unwrap());
Expand All @@ -60,7 +59,6 @@ impl SparkUnsafeArray {

Self {
row_addr: addr,
row_size: size,
num_elements: num_elements as usize,
element_offset: addr + Self::get_header_portion_in_bytes(num_elements),
}
Expand Down
17 changes: 3 additions & 14 deletions native/core/src/execution/shuffle/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ use arrow_array::builder::{
use arrow_schema::{DataType, FieldRef, Fields, TimeUnit};

pub struct SparkUnsafeMap {
row_addr: i64,
row_size: i32,
pub(crate) keys: SparkUnsafeArray,
pub(crate) values: SparkUnsafeArray,
}
Expand Down Expand Up @@ -59,8 +57,8 @@ impl SparkUnsafeMap {
panic!("Negative value size in bytes of map: {}", value_array_size);
}

let keys = SparkUnsafeArray::new(addr + 8, key_array_size as i32);
let values = SparkUnsafeArray::new(addr + 8 + key_array_size, value_array_size);
let keys = SparkUnsafeArray::new(addr + 8);
let values = SparkUnsafeArray::new(addr + 8 + key_array_size);

if keys.get_num_elements() != values.get_num_elements() {
panic!(
Expand All @@ -70,16 +68,7 @@ impl SparkUnsafeMap {
);
}

Self {
row_addr: addr,
row_size: size,
keys,
values,
}
}

pub(crate) fn get_num_elements(&self) -> usize {
self.keys.get_num_elements()
Self { keys, values }
}
}

Expand Down
Loading
Loading