Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unnecessary branching in row read/write if schema is null-free #1891

Merged
merged 3 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions datafusion/benches/jit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ extern crate datafusion;
mod data_utils;
use crate::criterion::Criterion;
use crate::data_utils::{create_record_batches, create_schema};
use datafusion::row::writer::{
bench_write_batch, bench_write_batch_jit, bench_write_batch_jit_dummy,
};
use datafusion::row::writer::{bench_write_batch, bench_write_batch_jit};
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -48,10 +46,6 @@ fn criterion_benchmark(c: &mut Criterion) {
criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap())
})
});

c.bench_function("row serializer jit codegen only", |b| {
b.iter(|| bench_write_batch_jit_dummy(schema.clone()).unwrap())
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
80 changes: 78 additions & 2 deletions datafusion/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ fn fn_name<T>(f: T) -> &'static str {
}
}

/// Tell if schema contains no nullable field
pub fn schema_null_free(schema: &Arc<Schema>) -> bool {
schema.fields().iter().all(|f| !f.is_nullable())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -323,7 +328,7 @@ mod tests {
#[test]
#[allow(non_snake_case)]
fn [<test_single_ $TYPE>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)]));
let a = $ARRAY::from($VEC);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
Expand All @@ -349,6 +354,38 @@ mod tests {
assert_eq!(batch, output_batch);
Ok(())
}

#[test]
#[allow(non_snake_case)]
fn [<test_single_ $TYPE _nf>]() -> Result<()> {
yjshen marked this conversation as resolved.
Show resolved Hide resolved
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
let v = $VEC.into_iter().filter(|o| o.is_some()).collect::<Vec<_>>();
let a = $ARRAY::from(v);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let row_offsets =
{ write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
let output_batch = { read_as_batch(&vector, schema, row_offsets)? };
assert_eq!(batch, output_batch);
Ok(())
}

#[test]
#[allow(non_snake_case)]
#[cfg(feature = "jit")]
fn [<test_single_ $TYPE _jit_nf>]() -> Result<()> {
yjshen marked this conversation as resolved.
Show resolved Hide resolved
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
let v = $VEC.into_iter().filter(|o| o.is_some()).collect::<Vec<_>>();
let a = $ARRAY::from(v);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
let output_batch = { read_as_batch_jit(&vector, schema, row_offsets, &assembler)? };
assert_eq!(batch, output_batch);
Ok(())
}
}
};
}
Expand Down Expand Up @@ -439,7 +476,7 @@ mod tests {

#[test]
fn test_single_binary() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)]));
let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, true)]));
yjshen marked this conversation as resolved.
Show resolved Hide resolved
let values: Vec<Option<&[u8]>> =
vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")];
let a = BinaryArray::from_opt_vec(values);
Expand Down Expand Up @@ -478,6 +515,45 @@ mod tests {
Ok(())
}

#[test]
fn test_single_binary_nf() -> Result<()> {
yjshen marked this conversation as resolved.
Show resolved Hide resolved
let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)]));
let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"];
let a = BinaryArray::from_vec(values);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 8192];
let row_offsets =
{ write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
let output_batch = { read_as_batch(&vector, schema, row_offsets)? };
assert_eq!(batch, output_batch);
Ok(())
}

#[test]
#[cfg(feature = "jit")]
fn test_single_binary_jit_nf() -> Result<()> {
yjshen marked this conversation as resolved.
Show resolved Hide resolved
let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)]));
let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"];
let a = BinaryArray::from_vec(values);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 8192];
let assembler = Assembler::default();
let row_offsets = {
write_batch_unchecked_jit(
&mut vector,
0,
&batch,
0,
schema.clone(),
&assembler,
)?
};
let output_batch =
{ read_as_batch_jit(&vector, schema, row_offsets, &assembler)? };
assert_eq!(batch, output_batch);
Ok(())
}

#[tokio::test]
async fn test_with_parquet() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
Expand Down
44 changes: 31 additions & 13 deletions datafusion/src/row/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use crate::error::{DataFusionError, Result};
use crate::reg_fn;
#[cfg(feature = "jit")]
use crate::row::fn_name;
use crate::row::{all_valid, get_offsets, supported, NullBitsFormatter};
use crate::row::{
all_valid, get_offsets, schema_null_free, supported, NullBitsFormatter,
};
use arrow::array::*;
use arrow::datatypes::{DataType, Schema};
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -133,32 +135,40 @@ pub struct RowReader<'a> {
/// For fixed length fields, it's where the actual data stores.
/// For variable length fields, it's a pack of (offset << 32 | length) if we use u64.
field_offsets: Vec<usize>,
/// If a row is null free according to its schema
null_free: bool,
}

impl<'a> std::fmt::Debug for RowReader<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let null_bits = self.null_bits();
write!(
f,
"{:?}",
NullBitsFormatter::new(null_bits, self.field_count)
)
if self.null_free {
write!(f, "null_free")
} else {
let null_bits = self.null_bits();
write!(
f,
"{:?}",
NullBitsFormatter::new(null_bits, self.field_count)
)
}
}
}

impl<'a> RowReader<'a> {
/// new
pub fn new(schema: &Arc<Schema>, data: &'a [u8]) -> Self {
assert!(supported(schema));
let null_free = schema_null_free(schema);
let field_count = schema.fields().len();
let null_width = ceil(field_count, 8);
let null_width = if null_free { 0 } else { ceil(field_count, 8) };
let (field_offsets, _) = get_offsets(null_width, schema);
Self {
data,
base_offset: 0,
field_count,
null_width,
field_offsets,
null_free,
}
}

Expand All @@ -174,14 +184,22 @@ impl<'a> RowReader<'a> {

#[inline(always)]
fn null_bits(&self) -> &[u8] {
let start = self.base_offset;
&self.data[start..start + self.null_width]
if self.null_free {
&[]
} else {
let start = self.base_offset;
&self.data[start..start + self.null_width]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if null_width is always zero, I wonder if the check for self.null_free is needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for not null_free code path. Actually this method shouldn't be touched when tuples are null-free

}
}

#[inline(always)]
fn all_valid(&self) -> bool {
let null_bits = self.null_bits();
all_valid(null_bits, self.field_count)
if self.null_free {
true
} else {
let null_bits = self.null_bits();
all_valid(null_bits, self.field_count)
}
}

fn is_valid_at(&self, idx: usize) -> bool {
Expand Down Expand Up @@ -276,7 +294,7 @@ impl<'a> RowReader<'a> {
}

fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Arc<Schema>) {
if row.all_valid() {
if row.null_free || row.all_valid() {
for ((col_idx, to), field) in batch
.arrays
.iter_mut()
Expand Down
71 changes: 46 additions & 25 deletions datafusion/src/row/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@

//! Reusable row writer backed by Vec<u8> to stitch attributes together

#[cfg(feature = "jit")]
use crate::error::Result;
#[cfg(feature = "jit")]
use crate::reg_fn;
#[cfg(feature = "jit")]
use crate::row::fn_name;
use crate::row::{estimate_row_width, fixed_size, get_offsets, supported};
use crate::row::{
estimate_row_width, fixed_size, get_offsets, schema_null_free, supported,
};
use arrow::array::*;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util::{ceil, round_upto_power_of_2, set_bit_raw, unset_bit_raw};
#[cfg(feature = "jit")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think over time it would be good to start trying to encapsulate the JIT'd code more (as in reduce the number of #[cfg(feature = "jit")] calls -- perhaps by defining a common interface for creating jit and non jit versions. As I am interested in getting more involved in this project, I would be happy to try and do so (or do it as part of a larger body of work)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that would be great! Thanks for the offering.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see what I can do over the next day or two

use datafusion_jit::api::CodeBlock;
#[cfg(feature = "jit")]
use datafusion_jit::api::{Assembler, GeneratedFunction};
#[cfg(feature = "jit")]
use datafusion_jit::ast::Expr;
#[cfg(feature = "jit")]
use datafusion_jit::ast::{BOOL, I64, PTR};
Expand Down Expand Up @@ -147,17 +152,6 @@ pub fn bench_write_batch_jit(
Ok(lengths)
}

#[cfg(feature = "jit")]
/// bench code generation cost
pub fn bench_write_batch_jit_dummy(schema: Arc<Schema>) -> Result<()> {
let assembler = Assembler::default();
register_write_functions(&assembler)?;
let gen_func = gen_write_row(&schema, &assembler)?;
let mut jit = assembler.create_jit();
let _: *const u8 = jit.compile(gen_func)?;
Ok(())
}

macro_rules! set_idx {
($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{
$SELF.assert_index_valid($IDX);
Expand Down Expand Up @@ -198,14 +192,17 @@ pub struct RowWriter {
/// For fixed length fields, it's where the actual data stores.
/// For variable length fields, it's a pack of (offset << 32 | length) if we use u64.
field_offsets: Vec<usize>,
/// If a row is null free according to its schema
null_free: bool,
}

impl RowWriter {
/// new
pub fn new(schema: &Arc<Schema>) -> Self {
assert!(supported(schema));
let null_free = schema_null_free(schema);
let field_count = schema.fields().len();
let null_width = ceil(field_count, 8);
let null_width = if null_free { 0 } else { ceil(field_count, 8) };
let (field_offsets, values_width) = get_offsets(null_width, schema);
let mut init_capacity = estimate_row_width(null_width, schema);
if !fixed_size(schema) {
Expand All @@ -221,6 +218,7 @@ impl RowWriter {
varlena_width: 0,
varlena_offset: null_width + values_width,
field_offsets,
null_free,
}
}

Expand All @@ -238,13 +236,21 @@ impl RowWriter {
}

fn set_null_at(&mut self, idx: usize) {
assert!(
!self.null_free,
"Unexpected call to set_null_at on null-free row writer"
);
let null_bits = &mut self.data[0..self.null_width];
unsafe {
unset_bit_raw(null_bits.as_mut_ptr(), idx);
}
}

fn set_non_null_at(&mut self, idx: usize) {
assert!(
!self.null_free,
"Unexpected call to set_non_null_at on null-free row writer"
);
let null_bits = &mut self.data[0..self.null_width];
unsafe {
set_bit_raw(null_bits.as_mut_ptr(), idx);
Expand Down Expand Up @@ -333,17 +339,30 @@ impl RowWriter {
/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width
fn write_row(row: &mut RowWriter, row_idx: usize, batch: &RecordBatch) -> usize {
// Get the row from the batch denoted by row_idx
for ((i, f), col) in batch
.schema()
.fields()
.iter()
.enumerate()
.zip(batch.columns().iter())
{
if !col.is_null(row_idx) {
if row.null_free {
for ((i, f), col) in batch
.schema()
.fields()
.iter()
.enumerate()
.zip(batch.columns().iter())
{
write_field(i, row_idx, col, f.data_type(), row);
} else {
row.set_null_at(i);
}
} else {
for ((i, f), col) in batch
.schema()
.fields()
.iter()
.enumerate()
.zip(batch.columns().iter())
{
if !col.is_null(row_idx) {
row.set_non_null_at(i);
write_field(i, row_idx, col, f.data_type(), row);
} else {
row.set_null_at(i);
}
}
}

Expand Down Expand Up @@ -392,6 +411,7 @@ fn gen_write_row(
.param("row", PTR)
.param("row_idx", I64)
.param("batch", PTR);
let null_free = schema_null_free(schema);
let mut b = builder.enter_block();
for (i, f) in schema.fields().iter().enumerate() {
let dt = f.data_type();
Expand Down Expand Up @@ -423,7 +443,9 @@ fn gen_write_row(
},
)?;
} else {
b.call_stmt("set_non_null_at", vec![b.id("row")?, b.lit_i(i as i64)])?;
if !null_free {
b.call_stmt("set_non_null_at", vec![b.id("row")?, b.lit_i(i as i64)])?;
}
let params = vec![
b.id("row")?,
b.id(&arr)?,
Expand Down Expand Up @@ -550,7 +572,6 @@ fn write_field(
row: &mut RowWriter,
) {
use DataType::*;
row.set_non_null_at(col_idx);
match dt {
Boolean => write_field_bool(row, col, col_idx, row_idx),
UInt8 => write_field_u8(row, col, col_idx, row_idx),
Expand Down