diff --git a/datafusion/benches/jit.rs b/datafusion/benches/jit.rs index b198b158c319..6abebc294185 100644 --- a/datafusion/benches/jit.rs +++ b/datafusion/benches/jit.rs @@ -23,9 +23,7 @@ extern crate datafusion; mod data_utils; use crate::criterion::Criterion; use crate::data_utils::{create_record_batches, create_schema}; -use datafusion::row::writer::{ - bench_write_batch, bench_write_batch_jit, bench_write_batch_jit_dummy, -}; +use datafusion::row::writer::{bench_write_batch, bench_write_batch_jit}; use std::sync::Arc; fn criterion_benchmark(c: &mut Criterion) { @@ -48,10 +46,6 @@ fn criterion_benchmark(c: &mut Criterion) { criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap()) }) }); - - c.bench_function("row serializer jit codegen only", |b| { - b.iter(|| bench_write_batch_jit_dummy(schema.clone()).unwrap()) - }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/src/row/mod.rs b/datafusion/src/row/mod.rs index 5cd9885238a9..531dbfe3e41e 100644 --- a/datafusion/src/row/mod.rs +++ b/datafusion/src/row/mod.rs @@ -212,6 +212,11 @@ fn fn_name(f: T) -> &'static str { } } +/// Tell if schema contains no nullable field +pub fn schema_null_free(schema: &Arc) -> bool { + schema.fields().iter().all(|f| !f.is_nullable()) +} + #[cfg(test)] mod tests { use super::*; @@ -323,7 +328,7 @@ mod tests { #[test] #[allow(non_snake_case)] fn []() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); + let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)])); let a = $ARRAY::from($VEC); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; let mut vector = vec![0; 1024]; @@ -349,6 +354,38 @@ mod tests { assert_eq!(batch, output_batch); Ok(()) } + + #[test] + #[allow(non_snake_case)] + fn []() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); + let v = $VEC.into_iter().filter(|o| o.is_some()).collect::>(); + let a = $ARRAY::from(v); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 1024]; + let row_offsets = + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; + let output_batch = { read_as_batch(&vector, schema, row_offsets)? }; + assert_eq!(batch, output_batch); + Ok(()) + } + + #[test] + #[allow(non_snake_case)] + #[cfg(feature = "jit")] + fn []() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); + let v = $VEC.into_iter().filter(|o| o.is_some()).collect::>(); + let a = $ARRAY::from(v); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 1024]; + let assembler = Assembler::default(); + let row_offsets = + { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? }; + let output_batch = { read_as_batch_jit(&vector, schema, row_offsets, &assembler)? }; + assert_eq!(batch, output_batch); + Ok(()) + } } }; } @@ -439,7 +476,7 @@ mod tests { #[test] fn test_single_binary() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)])); + let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, true)])); let values: Vec> = vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")]; let a = BinaryArray::from_opt_vec(values); @@ -478,6 +515,45 @@ mod tests { Ok(()) } + #[test] + fn test_single_binary_null_free() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)])); + let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"]; + let a = BinaryArray::from_vec(values); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 8192]; + let row_offsets = + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; + let output_batch = { read_as_batch(&vector, schema, row_offsets)? }; + assert_eq!(batch, output_batch); + Ok(()) + } + + #[test] + #[cfg(feature = "jit")] + fn test_single_binary_jit_null_free() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)])); + let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"]; + let a = BinaryArray::from_vec(values); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 8192]; + let assembler = Assembler::default(); + let row_offsets = { + write_batch_unchecked_jit( + &mut vector, + 0, + &batch, + 0, + schema.clone(), + &assembler, + )? + }; + let output_batch = + { read_as_batch_jit(&vector, schema, row_offsets, &assembler)? }; + assert_eq!(batch, output_batch); + Ok(()) + } + #[tokio::test] async fn test_with_parquet() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); diff --git a/datafusion/src/row/reader.rs b/datafusion/src/row/reader.rs index 213c34b574ad..3e2c45363987 100644 --- a/datafusion/src/row/reader.rs +++ b/datafusion/src/row/reader.rs @@ -22,7 +22,9 @@ use crate::error::{DataFusionError, Result}; use crate::reg_fn; #[cfg(feature = "jit")] use crate::row::fn_name; -use crate::row::{all_valid, get_offsets, supported, NullBitsFormatter}; +use crate::row::{ + all_valid, get_offsets, schema_null_free, supported, NullBitsFormatter, +}; use arrow::array::*; use arrow::datatypes::{DataType, Schema}; use arrow::error::Result as ArrowResult; @@ -133,16 +135,22 @@ pub struct RowReader<'a> { /// For fixed length fields, it's where the actual data stores. /// For variable length fields, it's a pack of (offset << 32 | length) if we use u64. field_offsets: Vec, + /// If a row is null free according to its schema + null_free: bool, } impl<'a> std::fmt::Debug for RowReader<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let null_bits = self.null_bits(); - write!( - f, - "{:?}", - NullBitsFormatter::new(null_bits, self.field_count) - ) + if self.null_free { + write!(f, "null_free") + } else { + let null_bits = self.null_bits(); + write!( + f, + "{:?}", + NullBitsFormatter::new(null_bits, self.field_count) + ) + } } } @@ -150,8 +158,9 @@ impl<'a> RowReader<'a> { /// new pub fn new(schema: &Arc, data: &'a [u8]) -> Self { assert!(supported(schema)); + let null_free = schema_null_free(schema); let field_count = schema.fields().len(); - let null_width = ceil(field_count, 8); + let null_width = if null_free { 0 } else { ceil(field_count, 8) }; let (field_offsets, _) = get_offsets(null_width, schema); Self { data, @@ -159,6 +168,7 @@ impl<'a> RowReader<'a> { field_count, null_width, field_offsets, + null_free, } } @@ -174,14 +184,22 @@ impl<'a> RowReader<'a> { #[inline(always)] fn null_bits(&self) -> &[u8] { - let start = self.base_offset; - &self.data[start..start + self.null_width] + if self.null_free { + &[] + } else { + let start = self.base_offset; + &self.data[start..start + self.null_width] + } } #[inline(always)] fn all_valid(&self) -> bool { - let null_bits = self.null_bits(); - all_valid(null_bits, self.field_count) + if self.null_free { + true + } else { + let null_bits = self.null_bits(); + all_valid(null_bits, self.field_count) + } } fn is_valid_at(&self, idx: usize) -> bool { @@ -276,7 +294,7 @@ impl<'a> RowReader<'a> { } fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Arc) { - if row.all_valid() { + if row.null_free || row.all_valid() { for ((col_idx, to), field) in batch .arrays .iter_mut() @@ -325,21 +343,21 @@ fn register_read_functions(asm: &Assembler) -> Result<()> { reg_fn!(asm, read_field_date64, reader_param.clone(), None); reg_fn!(asm, read_field_utf8, reader_param.clone(), None); reg_fn!(asm, read_field_binary, reader_param.clone(), None); - reg_fn!(asm, read_field_bool_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_u8_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_u16_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_u32_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_u64_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_i8_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_i16_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_i32_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_i64_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_f32_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_f64_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_date32_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_date64_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_utf8_nf, reader_param.clone(), None); - reg_fn!(asm, read_field_binary_nf, reader_param, None); + reg_fn!(asm, read_field_bool_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_u8_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_u16_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_u32_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_u64_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_i8_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_i16_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_i32_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_i64_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_f32_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_f64_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_date32_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_date64_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_utf8_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_binary_null_free, reader_param, None); Ok(()) } @@ -383,21 +401,21 @@ fn gen_read_row( } } else { match dt { - Boolean => b.call_stmt("read_field_bool_nf", params)?, - UInt8 => b.call_stmt("read_field_u8_nf", params)?, - UInt16 => b.call_stmt("read_field_u16_nf", params)?, - UInt32 => b.call_stmt("read_field_u32_nf", params)?, - UInt64 => b.call_stmt("read_field_u64_nf", params)?, - Int8 => b.call_stmt("read_field_i8_nf", params)?, - Int16 => b.call_stmt("read_field_i16_nf", params)?, - Int32 => b.call_stmt("read_field_i32_nf", params)?, - Int64 => b.call_stmt("read_field_i64_nf", params)?, - Float32 => b.call_stmt("read_field_f32_nf", params)?, - Float64 => b.call_stmt("read_field_f64_nf", params)?, - Date32 => b.call_stmt("read_field_date32_nf", params)?, - Date64 => b.call_stmt("read_field_date64_nf", params)?, - Utf8 => b.call_stmt("read_field_utf8_nf", params)?, - Binary => b.call_stmt("read_field_binary_nf", params)?, + Boolean => b.call_stmt("read_field_bool_null_free", params)?, + UInt8 => b.call_stmt("read_field_u8_null_free", params)?, + UInt16 => b.call_stmt("read_field_u16_null_free", params)?, + UInt32 => b.call_stmt("read_field_u32_null_free", params)?, + UInt64 => b.call_stmt("read_field_u64_null_free", params)?, + Int8 => b.call_stmt("read_field_i8_null_free", params)?, + Int16 => b.call_stmt("read_field_i16_null_free", params)?, + Int32 => b.call_stmt("read_field_i32_null_free", params)?, + Int64 => b.call_stmt("read_field_i64_null_free", params)?, + Float32 => b.call_stmt("read_field_f32_null_free", params)?, + Float64 => b.call_stmt("read_field_f64_null_free", params)?, + Date32 => b.call_stmt("read_field_date32_null_free", params)?, + Date64 => b.call_stmt("read_field_date64_null_free", params)?, + Utf8 => b.call_stmt("read_field_utf8_null_free", params)?, + Binary => b.call_stmt("read_field_binary_null_free", params)?, _ => unimplemented!(), } } @@ -418,7 +436,7 @@ macro_rules! fn_read_field { .unwrap(); } - fn [](to: &mut Box, col_idx: usize, row: &RowReader) { + fn [](to: &mut Box, col_idx: usize, row: &RowReader) { let to = to .as_any_mut() .downcast_mut::<$ARRAY>() @@ -455,7 +473,11 @@ fn read_field_binary(to: &mut Box, col_idx: usize, row: &RowRe } } -fn read_field_binary_nf(to: &mut Box, col_idx: usize, row: &RowReader) { +fn read_field_binary_null_free( + to: &mut Box, + col_idx: usize, + row: &RowReader, +) { let to = to.as_any_mut().downcast_mut::().unwrap(); to.append_value(row.get_binary(col_idx)) .map_err(DataFusionError::ArrowError) @@ -497,21 +519,21 @@ fn read_field_null_free( ) { use DataType::*; match dt { - Boolean => read_field_bool_nf(to, col_idx, row), - UInt8 => read_field_u8_nf(to, col_idx, row), - UInt16 => read_field_u16_nf(to, col_idx, row), - UInt32 => read_field_u32_nf(to, col_idx, row), - UInt64 => read_field_u64_nf(to, col_idx, row), - Int8 => read_field_i8_nf(to, col_idx, row), - Int16 => read_field_i16_nf(to, col_idx, row), - Int32 => read_field_i32_nf(to, col_idx, row), - Int64 => read_field_i64_nf(to, col_idx, row), - Float32 => read_field_f32_nf(to, col_idx, row), - Float64 => read_field_f64_nf(to, col_idx, row), - Date32 => read_field_date32_nf(to, col_idx, row), - Date64 => read_field_date64_nf(to, col_idx, row), - Utf8 => read_field_utf8_nf(to, col_idx, row), - Binary => read_field_binary_nf(to, col_idx, row), + Boolean => read_field_bool_null_free(to, col_idx, row), + UInt8 => read_field_u8_null_free(to, col_idx, row), + UInt16 => read_field_u16_null_free(to, col_idx, row), + UInt32 => read_field_u32_null_free(to, col_idx, row), + UInt64 => read_field_u64_null_free(to, col_idx, row), + Int8 => read_field_i8_null_free(to, col_idx, row), + Int16 => read_field_i16_null_free(to, col_idx, row), + Int32 => read_field_i32_null_free(to, col_idx, row), + Int64 => read_field_i64_null_free(to, col_idx, row), + Float32 => read_field_f32_null_free(to, col_idx, row), + Float64 => read_field_f64_null_free(to, col_idx, row), + Date32 => read_field_date32_null_free(to, col_idx, row), + Date64 => read_field_date64_null_free(to, col_idx, row), + Utf8 => read_field_utf8_null_free(to, col_idx, row), + Binary => read_field_binary_null_free(to, col_idx, row), _ => unimplemented!(), } } diff --git a/datafusion/src/row/writer.rs b/datafusion/src/row/writer.rs index 2206e350bcb2..ec3b27c318be 100644 --- a/datafusion/src/row/writer.rs +++ b/datafusion/src/row/writer.rs @@ -17,19 +17,24 @@ //! Reusable row writer backed by Vec to stitch attributes together +#[cfg(feature = "jit")] use crate::error::Result; #[cfg(feature = "jit")] use crate::reg_fn; #[cfg(feature = "jit")] use crate::row::fn_name; -use crate::row::{estimate_row_width, fixed_size, get_offsets, supported}; +use crate::row::{ + estimate_row_width, fixed_size, get_offsets, schema_null_free, supported, +}; use arrow::array::*; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use arrow::util::bit_util::{ceil, round_upto_power_of_2, set_bit_raw, unset_bit_raw}; +#[cfg(feature = "jit")] use datafusion_jit::api::CodeBlock; #[cfg(feature = "jit")] use datafusion_jit::api::{Assembler, GeneratedFunction}; +#[cfg(feature = "jit")] use datafusion_jit::ast::Expr; #[cfg(feature = "jit")] use datafusion_jit::ast::{BOOL, I64, PTR}; @@ -147,17 +152,6 @@ pub fn bench_write_batch_jit( Ok(lengths) } -#[cfg(feature = "jit")] -/// bench code generation cost -pub fn bench_write_batch_jit_dummy(schema: Arc) -> Result<()> { - let assembler = Assembler::default(); - register_write_functions(&assembler)?; - let gen_func = gen_write_row(&schema, &assembler)?; - let mut jit = assembler.create_jit(); - let _: *const u8 = jit.compile(gen_func)?; - Ok(()) -} - macro_rules! set_idx { ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{ $SELF.assert_index_valid($IDX); @@ -198,14 +192,17 @@ pub struct RowWriter { /// For fixed length fields, it's where the actual data stores. /// For variable length fields, it's a pack of (offset << 32 | length) if we use u64. field_offsets: Vec, + /// If a row is null free according to its schema + null_free: bool, } impl RowWriter { /// new pub fn new(schema: &Arc) -> Self { assert!(supported(schema)); + let null_free = schema_null_free(schema); let field_count = schema.fields().len(); - let null_width = ceil(field_count, 8); + let null_width = if null_free { 0 } else { ceil(field_count, 8) }; let (field_offsets, values_width) = get_offsets(null_width, schema); let mut init_capacity = estimate_row_width(null_width, schema); if !fixed_size(schema) { @@ -221,6 +218,7 @@ impl RowWriter { varlena_width: 0, varlena_offset: null_width + values_width, field_offsets, + null_free, } } @@ -238,6 +236,10 @@ impl RowWriter { } fn set_null_at(&mut self, idx: usize) { + assert!( + !self.null_free, + "Unexpected call to set_null_at on null-free row writer" + ); let null_bits = &mut self.data[0..self.null_width]; unsafe { unset_bit_raw(null_bits.as_mut_ptr(), idx); @@ -245,6 +247,10 @@ impl RowWriter { } fn set_non_null_at(&mut self, idx: usize) { + assert!( + !self.null_free, + "Unexpected call to set_non_null_at on null-free row writer" + ); let null_bits = &mut self.data[0..self.null_width]; unsafe { set_bit_raw(null_bits.as_mut_ptr(), idx); @@ -333,17 +339,30 @@ impl RowWriter { /// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width fn write_row(row: &mut RowWriter, row_idx: usize, batch: &RecordBatch) -> usize { // Get the row from the batch denoted by row_idx - for ((i, f), col) in batch - .schema() - .fields() - .iter() - .enumerate() - .zip(batch.columns().iter()) - { - if !col.is_null(row_idx) { + if row.null_free { + for ((i, f), col) in batch + .schema() + .fields() + .iter() + .enumerate() + .zip(batch.columns().iter()) + { write_field(i, row_idx, col, f.data_type(), row); - } else { - row.set_null_at(i); + } + } else { + for ((i, f), col) in batch + .schema() + .fields() + .iter() + .enumerate() + .zip(batch.columns().iter()) + { + if !col.is_null(row_idx) { + row.set_non_null_at(i); + write_field(i, row_idx, col, f.data_type(), row); + } else { + row.set_null_at(i); + } } } @@ -392,6 +411,7 @@ fn gen_write_row( .param("row", PTR) .param("row_idx", I64) .param("batch", PTR); + let null_free = schema_null_free(schema); let mut b = builder.enter_block(); for (i, f) in schema.fields().iter().enumerate() { let dt = f.data_type(); @@ -423,7 +443,9 @@ fn gen_write_row( }, )?; } else { - b.call_stmt("set_non_null_at", vec![b.id("row")?, b.lit_i(i as i64)])?; + if !null_free { + b.call_stmt("set_non_null_at", vec![b.id("row")?, b.lit_i(i as i64)])?; + } let params = vec![ b.id("row")?, b.id(&arr)?, @@ -550,7 +572,6 @@ fn write_field( row: &mut RowWriter, ) { use DataType::*; - row.set_non_null_at(col_idx); match dt { Boolean => write_field_bool(row, col, col_idx, row_idx), UInt8 => write_field_u8(row, col, col_idx, row_idx),