Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Special-case filter single array record batch in filter_record_batch #235

Merged
merged 3 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions benches/filter_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
// under the License.
extern crate arrow2;

use std::sync::Arc;

use arrow2::array::*;
use arrow2::compute::filter::{build_filter, filter, Filter};
use arrow2::datatypes::DataType;
use arrow2::compute::filter::{build_filter, filter, filter_record_batch, Filter};
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::*;

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -120,6 +123,17 @@ fn add_benchmark(c: &mut Criterion) {
c.bench_function("filter context string low selectivity", |b| {
b.iter(|| bench_built_filter(&sparse_filter, &data_array))
});

let data_array = create_primitive_array::<f32>(size, DataType::Float32, 0.0);

let field = Field::new("c1", data_array.data_type().clone(), true);
let schema = Schema::new(vec![field]);

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data_array)]).unwrap();

c.bench_function("filter single record batch", |b| {
b.iter(|| filter_record_batch(&batch, &filter_array))
});
}

criterion_group!(benches, add_benchmark);
Expand Down
23 changes: 16 additions & 7 deletions src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,23 @@ pub fn filter(array: &dyn Array, filter: &BooleanArray) -> Result<Box<dyn Array>
/// Therefore, it is considered undefined behavior to pass `filter` with null values.
pub fn filter_record_batch(
record_batch: &RecordBatch,
filter: &BooleanArray,
filter_values: &BooleanArray,
) -> Result<RecordBatch> {
let filter = build_filter(filter)?;
let filtered_arrays = record_batch
.columns()
.iter()
.map(|a| filter(a.as_ref()).into())
.collect();
let num_colums = record_batch.columns().len();

let filtered_arrays = match num_colums {
1 => {
vec![filter(record_batch.columns()[0].as_ref(), filter_values)?.into()]
}
_ => {
let filter = build_filter(filter_values)?;
record_batch
.columns()
.iter()
.map(|a| filter(a.as_ref()).into())
.collect()
}
};
RecordBatch::try_new(record_batch.schema().clone(), filtered_arrays)
}

Expand Down