From fe4cb8a4e25c56e973f3ce67112ac3ba9a9355f9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jul 2023 15:31:07 -0400 Subject: [PATCH 1/4] Remove `RowAccumulators` --- .../core/src/physical_plan/aggregates/mod.rs | 17 - datafusion/core/src/physical_plan/mod.rs | 1 - .../physical-expr/src/aggregate/average.rs | 134 -------- .../src/aggregate/bit_and_or_xor.rs | 298 ------------------ .../src/aggregate/bool_and_or.rs | 196 ------------ .../physical-expr/src/aggregate/count.rs | 86 ----- .../physical-expr/src/aggregate/min_max.rs | 222 ------------- datafusion/physical-expr/src/aggregate/mod.rs | 21 -- .../src/aggregate/row_accumulator.rs | 99 ------ datafusion/physical-expr/src/aggregate/sum.rs | 245 +------------- 10 files changed, 1 insertion(+), 1318 deletions(-) delete mode 100644 datafusion/physical-expr/src/aggregate/row_accumulator.rs diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 9d8ced18dac3..5b4e6dbdf024 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -33,7 +33,6 @@ use datafusion_common::{DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::Accumulator; use datafusion_physical_expr::{ - aggregate::row_accumulator::RowAccumulator, equivalence::project_equivalence_properties, expressions::{Avg, CastExpr, Column, Sum}, normalize_out_expr_with_columns_map, reverse_order_bys, @@ -1093,7 +1092,6 @@ fn merge_expressions( } pub(crate) type AccumulatorItem = Box; -pub(crate) type RowAccumulatorItem = Box; fn create_accumulators( aggr_expr: &[Arc], @@ -1104,21 +1102,6 @@ fn create_accumulators( .collect::>>() } -#[allow(dead_code)] -fn create_row_accumulators( - aggr_expr: &[Arc], -) -> Result> { - let mut state_index = 0; - aggr_expr - .iter() - .map(|expr| { - let result = expr.create_row_accumulator(state_index); - state_index += expr.state_fields().unwrap().len(); - result - }) - .collect::>>() -} - /// returns a vector of ArrayRefs, where each entry corresponds to either the /// final value (mode = Final, FinalPartitioned and Single) or states (mode = Partial) fn finalize_aggregation( diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 335b10eb46da..0d322bfb11e2 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -32,7 +32,6 @@ use arrow::record_batch::RecordBatch; use datafusion_common::utils::DataPtr; pub use datafusion_expr::Accumulator; pub use datafusion_expr::ColumnarValue; -pub use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator; use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties; pub use display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use futures::stream::{Stream, TryStreamExt}; diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index 6ad0f94e032a..a1d77a2d8849 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -25,9 +25,6 @@ use std::convert::TryFrom; use std::sync::Arc; use crate::aggregate::groups_accumulator::accumulate::NullState; -use crate::aggregate::row_accumulator::{ - is_row_accumulator_support_dtype, RowAccumulator, -}; use crate::aggregate::sum; use crate::aggregate::sum::sum_batch; use crate::aggregate::utils::calculate_result_decimal_for_avg; @@ -46,7 +43,6 @@ use arrow_array::{ use datafusion_common::{downcast_value, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Accumulator; -use datafusion_row::accessor::RowAccessor; use super::groups_accumulator::EmitTo; use super::utils::{adjust_output_array, Decimal128Averager}; @@ -139,21 +135,6 @@ impl AggregateExpr for Avg { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.sum_data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(AvgRowAccumulator::new( - start_index, - &self.sum_data_type, - &self.rt_data_type, - ))) - } - fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } @@ -321,121 +302,6 @@ impl Accumulator for AvgAccumulator { } } -#[derive(Debug)] -struct AvgRowAccumulator { - state_index: usize, - sum_datatype: DataType, - return_data_type: DataType, -} - -impl AvgRowAccumulator { - pub fn new( - start_index: usize, - sum_datatype: &DataType, - return_data_type: &DataType, - ) -> Self { - Self { - state_index: start_index, - sum_datatype: sum_datatype.clone(), - return_data_type: return_data_type.clone(), - } - } -} - -impl RowAccumulator for AvgRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - // count - let delta = (values.len() - values.null_count()) as u64; - accessor.add_u64(self.state_index(), delta); - - // sum - sum::add_to_row( - self.state_index() + 1, - accessor, - &sum::sum_batch(values, &self.sum_datatype)?, - ) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - sum::update_avg_to_row(self.state_index(), accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - sum::update_avg_to_row(self.state_index(), accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let counts = downcast_value!(states[0], UInt64Array); - // count - let delta = compute::sum(counts).unwrap_or(0); - accessor.add_u64(self.state_index(), delta); - - // sum - let difference = sum::sum_batch(&states[1], &self.sum_datatype)?; - sum::add_to_row(self.state_index() + 1, accessor, &difference) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - match self.sum_datatype { - DataType::Decimal128(p, s) => { - match accessor.get_u64_opt(self.state_index()) { - None => Ok(ScalarValue::Decimal128(None, p, s)), - Some(0) => Ok(ScalarValue::Decimal128(None, p, s)), - Some(n) => { - // now the sum_type and return type is not the same, need to convert the sum type to return type - accessor.get_i128_opt(self.state_index() + 1).map_or_else( - || Ok(ScalarValue::Decimal128(None, p, s)), - |f| { - calculate_result_decimal_for_avg( - f, - n as i128, - s, - &self.return_data_type, - ) - }, - ) - } - } - } - DataType::Float64 => Ok(match accessor.get_u64_opt(self.state_index()) { - None => ScalarValue::Float64(None), - Some(0) => ScalarValue::Float64(None), - Some(n) => ScalarValue::Float64( - accessor - .get_f64_opt(self.state_index() + 1) - .map(|f| f / n as f64), - ), - }), - _ => Err(DataFusionError::Internal( - "Sum should be f64 or decimal128 on average".to_string(), - )), - } - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.state_index - } -} - /// An accumulator to compute the average of `[PrimitiveArray]`. /// Stores values as native types, and does overflow checking /// diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs index 4c7733520e0c..dfc07cc04bea 100644 --- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs +++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs @@ -39,14 +39,10 @@ use datafusion_expr::Accumulator; use std::collections::HashSet; use crate::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; -use crate::aggregate::row_accumulator::{ - is_row_accumulator_support_dtype, RowAccumulator, -}; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use arrow::array::Array; use arrow::compute::{bit_and, bit_or, bit_xor}; -use datafusion_row::accessor::RowAccessor; /// Creates a [`PrimitiveGroupsAccumulator`] with the specified /// [`ArrowPrimitiveType`] that initailizes each accumulator to $START @@ -123,82 +119,6 @@ fn bit_xor_batch(values: &ArrayRef) -> Result { bit_and_or_xor_batch!(values, bit_xor) } -// bit_and/bit_or/bit_xor of two scalar values. -macro_rules! typed_bit_and_or_xor_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{ - paste::item! { - match $SCALAR { - None => {} - Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE) - } - } - }}; -} - -macro_rules! bit_and_or_xor_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{ - Ok(match $SCALAR { - ScalarValue::UInt64(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u64, $OP) - } - ScalarValue::UInt32(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u32, $OP) - } - ScalarValue::UInt16(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u16, $OP) - } - ScalarValue::UInt8(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u8, $OP) - } - ScalarValue::Int64(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i64, $OP) - } - ScalarValue::Int32(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i32, $OP) - } - ScalarValue::Int16(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i16, $OP) - } - ScalarValue::Int8(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i8, $OP) - } - ScalarValue::Null => { - // do nothing - } - e => { - return Err(DataFusionError::Internal(format!( - "BIT AND/BIT OR/BIT XOR is not expected to receive scalars of incompatible types {:?}", - e - ))) - } - }) - }}; -} - -pub fn bit_and_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - bit_and_or_xor_v2!(index, accessor, s, bitand) -} - -pub fn bit_or_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - bit_and_or_xor_v2!(index, accessor, s, bitor) -} - -pub fn bit_xor_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - bit_and_or_xor_v2!(index, accessor, s, bitxor) -} - /// BIT_AND aggregate expression #[derive(Debug, Clone)] pub struct BitAnd { @@ -258,20 +178,6 @@ impl AggregateExpr for BitAnd { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(BitAndRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } - fn groups_accumulator_supported(&self) -> bool { true } @@ -378,64 +284,6 @@ impl Accumulator for BitAndAccumulator { } } -#[derive(Debug)] -struct BitAndRowAccumulator { - index: usize, - datatype: DataType, -} - -impl BitAndRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for BitAndRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &bit_and_batch(values)?; - bit_and_row(self.index, accessor, delta) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - bit_and_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - bit_and_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - /// BIT_OR aggregate expression #[derive(Debug, Clone)] pub struct BitOr { @@ -495,20 +343,6 @@ impl AggregateExpr for BitOr { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(BitOrRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } - fn groups_accumulator_supported(&self) -> bool { true } @@ -608,65 +442,6 @@ impl Accumulator for BitOrAccumulator { } } -#[derive(Debug)] -struct BitOrRowAccumulator { - index: usize, - datatype: DataType, -} - -impl BitOrRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for BitOrRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &bit_or_batch(values)?; - bit_or_row(self.index, accessor, delta)?; - Ok(()) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - bit_or_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - bit_or_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - /// BIT_XOR aggregate expression #[derive(Debug, Clone)] pub struct BitXor { @@ -726,20 +501,6 @@ impl AggregateExpr for BitXor { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(BitXorRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } - fn groups_accumulator_supported(&self) -> bool { true } @@ -839,65 +600,6 @@ impl Accumulator for BitXorAccumulator { } } -#[derive(Debug)] -struct BitXorRowAccumulator { - index: usize, - datatype: DataType, -} - -impl BitXorRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for BitXorRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &bit_xor_batch(values)?; - bit_xor_row(self.index, accessor, delta)?; - Ok(()) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - bit_xor_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - bit_xor_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - /// Expression for a BIT_XOR(DISTINCT) aggregation. #[derive(Debug, Clone)] pub struct DistinctBitXor { diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs b/datafusion/physical-expr/src/aggregate/bool_and_or.rs index 6107b0972c81..f963f680dfe2 100644 --- a/datafusion/physical-expr/src/aggregate/bool_and_or.rs +++ b/datafusion/physical-expr/src/aggregate/bool_and_or.rs @@ -29,14 +29,10 @@ use std::any::Any; use std::sync::Arc; use crate::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; -use crate::aggregate::row_accumulator::{ - is_row_accumulator_support_dtype, RowAccumulator, -}; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use arrow::array::Array; use arrow::compute::{bool_and, bool_or}; -use datafusion_row::accessor::RowAccessor; // returns the new value after bool_and/bool_or with the new values, taking nullability into account macro_rules! typed_bool_and_or_batch { @@ -73,53 +69,6 @@ fn bool_or_batch(values: &ArrayRef) -> Result { bool_and_or_batch!(values, bool_or) } -// bool_and/bool_or of two scalar values. -macro_rules! typed_bool_and_or_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{ - paste::item! { - match $SCALAR { - None => {} - Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE) - } - } - }}; -} - -macro_rules! bool_and_or_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{ - Ok(match $SCALAR { - ScalarValue::Boolean(rhs) => { - typed_bool_and_or_v2!($INDEX, $ACC, rhs, bool, $OP) - } - ScalarValue::Null => { - // do nothing - } - e => { - return Err(DataFusionError::Internal(format!( - "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}", - e - ))) - } - }) - }}; -} - -pub fn bool_and_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - bool_and_or_v2!(index, accessor, s, bitand) -} - -pub fn bool_or_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - bool_and_or_v2!(index, accessor, s, bitor) -} - /// BOOL_AND aggregate expression #[derive(Debug, Clone)] pub struct BoolAnd { @@ -179,20 +128,6 @@ impl AggregateExpr for BoolAnd { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(BoolAndRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } - fn groups_accumulator_supported(&self) -> bool { true } @@ -267,64 +202,6 @@ impl Accumulator for BoolAndAccumulator { } } -#[derive(Debug)] -struct BoolAndRowAccumulator { - index: usize, - datatype: DataType, -} - -impl BoolAndRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for BoolAndRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &bool_and_batch(values)?; - bool_and_row(self.index, accessor, delta) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - bool_and_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - bool_and_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - /// BOOL_OR aggregate expression #[derive(Debug, Clone)] pub struct BoolOr { @@ -384,20 +261,6 @@ impl AggregateExpr for BoolOr { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(BoolOrRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } - fn groups_accumulator_supported(&self) -> bool { true } @@ -472,65 +335,6 @@ impl Accumulator for BoolOrAccumulator { } } -#[derive(Debug)] -struct BoolOrRowAccumulator { - index: usize, - datatype: DataType, -} - -impl BoolOrRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for BoolOrRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &bool_or_batch(values)?; - bool_or_row(self.index, accessor, delta)?; - Ok(()) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - bool_or_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - bool_or_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 60e15a673a0c..178f08b48107 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -22,7 +22,6 @@ use std::fmt::Debug; use std::ops::BitAnd; use std::sync::Arc; -use crate::aggregate::row_accumulator::RowAccumulator; use crate::aggregate::utils::down_cast_any_ref; use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; use arrow::array::{Array, Int64Array}; @@ -36,7 +35,6 @@ use arrow_buffer::BooleanBuffer; use datafusion_common::{downcast_value, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Accumulator; -use datafusion_row::accessor::RowAccessor; use crate::expressions::format_state_name; @@ -247,23 +245,12 @@ impl AggregateExpr for Count { &self.name } - fn row_accumulator_supported(&self) -> bool { - true - } - fn groups_accumulator_supported(&self) -> bool { // groups accumulator only supports `COUNT(c1)`, not // `COUNT(c1, c2)`, etc self.exprs.len() == 1 } - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(CountRowAccumulator::new(start_index))) - } - fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } @@ -348,79 +335,6 @@ impl Accumulator for CountAccumulator { } } -#[derive(Debug)] -struct CountRowAccumulator { - state_index: usize, -} - -impl CountRowAccumulator { - pub fn new(index: usize) -> Self { - Self { state_index: index } - } -} - -impl RowAccumulator for CountRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let array = &values[0]; - let delta = (array.len() - null_count_for_multiple_cols(values)) as u64; - accessor.add_u64(self.state_index, delta); - Ok(()) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - if !values.iter().any(|s| matches!(s, ScalarValue::Null)) { - accessor.add_u64(self.state_index, 1) - } - Ok(()) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - match value { - ScalarValue::Null => { - // do not update the accumulator - } - _ => accessor.add_u64(self.state_index, 1), - } - Ok(()) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let counts = downcast_value!(states[0], Int64Array); - let delta = &compute::sum(counts); - if let Some(d) = delta { - accessor.add_i64(self.state_index, *d); - } - Ok(()) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(ScalarValue::Int64(Some( - accessor.get_u64_opt(self.state_index()).unwrap_or(0) as i64, - ))) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.state_index - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index cc230c174b4f..613e6532511b 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -48,14 +48,10 @@ use datafusion_common::ScalarValue; use datafusion_common::{downcast_value, DataFusionError, Result}; use datafusion_expr::Accumulator; -use crate::aggregate::row_accumulator::{ - is_row_accumulator_support_dtype, RowAccumulator, -}; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use arrow::array::Array; use arrow::array::Decimal128Array; -use datafusion_row::accessor::RowAccessor; use super::moving_min_max; @@ -172,10 +168,6 @@ impl AggregateExpr for Max { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - fn groups_accumulator_supported(&self) -> bool { use DataType::*; matches!( @@ -198,16 +190,6 @@ impl AggregateExpr for Max { ) } - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(MaxRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } - fn create_groups_accumulator(&self) -> Result> { use DataType::*; use TimeUnit::*; @@ -457,18 +439,6 @@ macro_rules! typed_min_max { }}; } -// min/max of two non-string scalar values. -macro_rules! typed_min_max_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{ - paste::item! { - match $SCALAR { - None => {} - Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE) - } - } - }}; -} - // min/max of two scalar string values. macro_rules! typed_min_max_string { ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{ @@ -666,77 +636,16 @@ macro_rules! min_max { }}; } -// min/max of two scalar values of the same type -macro_rules! min_max_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{ - Ok(match $SCALAR { - ScalarValue::Boolean(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, bool, $OP) - } - ScalarValue::Float64(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, f64, $OP) - } - ScalarValue::Float32(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, f32, $OP) - } - ScalarValue::UInt64(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, u64, $OP) - } - ScalarValue::UInt32(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, u32, $OP) - } - ScalarValue::UInt16(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, u16, $OP) - } - ScalarValue::UInt8(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, u8, $OP) - } - ScalarValue::Int64(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, i64, $OP) - } - ScalarValue::Int32(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, i32, $OP) - } - ScalarValue::Int16(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, i16, $OP) - } - ScalarValue::Int8(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, i8, $OP) - } - ScalarValue::Decimal128(rhs, ..) => { - typed_min_max_v2!($INDEX, $ACC, rhs, i128, $OP) - } - ScalarValue::Null => { - // do nothing - } - e => { - return Err(DataFusionError::Internal(format!( - "MIN/MAX is not expected to receive scalars of incompatible types {:?}", - e - ))) - } - }) - }}; -} - /// the minimum of two scalar values pub fn min(lhs: &ScalarValue, rhs: &ScalarValue) -> Result { min_max!(lhs, rhs, min) } -pub fn min_row(index: usize, accessor: &mut RowAccessor, s: &ScalarValue) -> Result<()> { - min_max_v2!(index, accessor, s, min) -} - /// the maximum of two scalar values pub fn max(lhs: &ScalarValue, rhs: &ScalarValue) -> Result { min_max!(lhs, rhs, max) } -pub fn max_row(index: usize, accessor: &mut RowAccessor, s: &ScalarValue) -> Result<()> { - min_max_v2!(index, accessor, s, max) -} - /// An accumulator to compute the maximum value #[derive(Debug)] pub struct MaxAccumulator { @@ -837,64 +746,6 @@ impl Accumulator for SlidingMaxAccumulator { } } -#[derive(Debug)] -struct MaxRowAccumulator { - index: usize, - data_type: DataType, -} - -impl MaxRowAccumulator { - pub fn new(index: usize, data_type: DataType) -> Self { - Self { index, data_type } - } -} - -impl RowAccumulator for MaxRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &max_batch(values)?; - max_row(self.index, accessor, delta) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - max_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - max_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.data_type, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - /// MIN aggregate expression #[derive(Debug, Clone)] pub struct Min { @@ -954,20 +805,6 @@ impl AggregateExpr for Min { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(MinRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } - fn groups_accumulator_supported(&self) -> bool { use DataType::*; matches!( @@ -1173,65 +1010,6 @@ impl Accumulator for SlidingMinAccumulator { } } -#[derive(Debug)] -struct MinRowAccumulator { - index: usize, - data_type: DataType, -} - -impl MinRowAccumulator { - pub fn new(index: usize, data_type: DataType) -> Self { - Self { index, data_type } - } -} - -impl RowAccumulator for MinRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &min_batch(values)?; - min_row(self.index, accessor, delta)?; - Ok(()) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - min_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - min_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.data_type, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 21efb3c2f91b..5490b875763a 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use crate::aggregate::row_accumulator::RowAccumulator; use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg}; use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::datatypes::Field; @@ -50,7 +49,6 @@ pub mod build_in; pub(crate) mod groups_accumulator; mod hyperloglog; pub mod moving_min_max; -pub mod row_accumulator; pub(crate) mod stats; pub(crate) mod stddev; pub(crate) mod sum; @@ -102,25 +100,6 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { "AggregateExpr: default name" } - /// If the aggregate expression is supported by row format - fn row_accumulator_supported(&self) -> bool { - false - } - - /// RowAccumulator to access/update row-based aggregation state in-place. - /// Currently, row accumulator only supports states of fixed-sized type. - /// - /// We recommend implementing `RowAccumulator` along with the standard `Accumulator`, - /// when its state is of fixed size, as RowAccumulator is more memory efficient and CPU-friendly. - fn create_row_accumulator( - &self, - _start_index: usize, - ) -> Result> { - Err(DataFusionError::NotImplemented(format!( - "RowAccumulator hasn't been implemented for {self:?} yet" - ))) - } - /// If the aggregate expression has a specialized /// [`GroupsAccumulator`] implementation. If this returns true, /// `[Self::create_groups_accumulator`] will be called. diff --git a/datafusion/physical-expr/src/aggregate/row_accumulator.rs b/datafusion/physical-expr/src/aggregate/row_accumulator.rs deleted file mode 100644 index e5282629220f..000000000000 --- a/datafusion/physical-expr/src/aggregate/row_accumulator.rs +++ /dev/null @@ -1,99 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Accumulator over row format - -use arrow::array::ArrayRef; -use arrow_schema::DataType; -use datafusion_common::{Result, ScalarValue}; -use datafusion_row::accessor::RowAccessor; -use std::fmt::Debug; - -/// Row-based accumulator where the internal aggregate state(s) are stored using row format. -/// -/// Unlike the [`datafusion_expr::Accumulator`], the [`RowAccumulator`] does not store the state internally. -/// Instead, it knows how to access/update the state stored in a row via the the provided accessor and -/// its state's starting field index in the row. -/// -/// For example, we are evaluating `SELECT a, sum(b), avg(c), count(d) from GROUP BY a;`, we would have one row used as -/// aggregation state for each distinct `a` value, the index of the first and the only state of `sum(b)` would be 0, -/// the index of the first state of `avg(c)` would be 1, and the index of the first and only state of `cound(d)` would be 3: -/// -/// sum(b) state_index = 0 count(d) state_index = 3 -/// | | -/// v v -/// +--------+----------+--------+----------+ -/// | sum(b) | count(c) | sum(c) | count(d) | -/// +--------+----------+--------+----------+ -/// ^ -/// | -/// avg(c) state_index = 1 -/// -pub trait RowAccumulator: Send + Sync + Debug { - /// updates the accumulator's state from a vector of arrays. - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()>; - - /// updates the accumulator's state from a vector of Scalar value. - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()>; - - /// updates the accumulator's state from a Scalar value. - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()>; - - /// updates the accumulator's state from a vector of states. - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()>; - - /// returns its value based on its current state. - fn evaluate(&self, accessor: &RowAccessor) -> Result; - - /// State's starting field index in the row. - fn state_index(&self) -> usize; -} - -/// Returns if `data_type` is supported with `RowAccumulator` -pub fn is_row_accumulator_support_dtype(data_type: &DataType) -> bool { - matches!( - data_type, - DataType::Boolean - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::Float32 - | DataType::Float64 - | DataType::Decimal128(_, _) - ) -} diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index 5f00e594fef5..45e2be7fb4c6 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -23,9 +23,6 @@ use std::ops::AddAssign; use std::sync::Arc; use super::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; -use crate::aggregate::row_accumulator::{ - is_row_accumulator_support_dtype, RowAccumulator, -}; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; @@ -47,7 +44,6 @@ use arrow_array::types::{ }; use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; -use datafusion_row::accessor::RowAccessor; /// SUM aggregate expression #[derive(Debug, Clone)] @@ -138,24 +134,10 @@ impl AggregateExpr for Sum { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - fn groups_accumulator_supported(&self) -> bool { true } - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(SumRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } - fn create_groups_accumulator(&self) -> Result> { // instantiate specialized accumulator match self.data_type { @@ -299,101 +281,6 @@ pub(crate) fn sum_batch(values: &ArrayRef, sum_type: &DataType) -> Result {{ - paste::item! { - if let Some(v) = $DELTA { - $ACC.[]($INDEX, *v) - } - } - }}; -} - -macro_rules! avg_row { - ($INDEX:ident, $ACC:ident, $DELTA:expr, $TYPE:ident) => {{ - paste::item! { - if let Some(v) = $DELTA { - $ACC.add_u64($INDEX, 1); - $ACC.[]($INDEX + 1, *v) - } - } - }}; -} - -pub(crate) fn add_to_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - match s { - ScalarValue::Null => { - // do nothing - } - ScalarValue::Float64(rhs) => { - sum_row!(index, accessor, rhs, f64) - } - ScalarValue::Float32(rhs) => { - sum_row!(index, accessor, rhs, f32) - } - ScalarValue::UInt64(rhs) => { - sum_row!(index, accessor, rhs, u64) - } - ScalarValue::Int64(rhs) => { - sum_row!(index, accessor, rhs, i64) - } - ScalarValue::Decimal128(rhs, _, _) => { - sum_row!(index, accessor, rhs, i128) - } - ScalarValue::Dictionary(_, value) => { - let value = value.as_ref(); - return add_to_row(index, accessor, value); - } - _ => { - let msg = - format!("Row sum updater is not expected to receive a scalar {s:?}"); - return Err(DataFusionError::Internal(msg)); - } - } - Ok(()) -} - -pub(crate) fn update_avg_to_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - match s { - ScalarValue::Null => { - // do nothing - } - ScalarValue::Float64(rhs) => { - avg_row!(index, accessor, rhs, f64) - } - ScalarValue::Float32(rhs) => { - avg_row!(index, accessor, rhs, f32) - } - ScalarValue::UInt64(rhs) => { - avg_row!(index, accessor, rhs, u64) - } - ScalarValue::Int64(rhs) => { - avg_row!(index, accessor, rhs, i64) - } - ScalarValue::Decimal128(rhs, _, _) => { - avg_row!(index, accessor, rhs, i128) - } - ScalarValue::Dictionary(_, value) => { - let value = value.as_ref(); - return update_avg_to_row(index, accessor, value); - } - _ => { - let msg = - format!("Row avg updater is not expected to receive a scalar {s:?}"); - return Err(DataFusionError::Internal(msg)); - } - } - Ok(()) -} - impl Accumulator for SumAccumulator { fn state(&self) -> Result> { Ok(vec![self.sum.clone()]) @@ -467,73 +354,14 @@ impl Accumulator for SlidingSumAccumulator { } } -#[derive(Debug)] -struct SumRowAccumulator { - index: usize, - datatype: DataType, -} - -impl SumRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for SumRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = sum_batch(values, &self.datatype)?; - add_to_row(self.index, accessor, &delta) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - add_to_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - add_to_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - #[cfg(test)] mod tests { use super::*; + use crate::expressions::col; use crate::expressions::tests::aggregate; - use crate::expressions::{col, Avg}; use crate::generic_test_op; use arrow::datatypes::*; use arrow::record_batch::RecordBatch; - use arrow_array::DictionaryArray; use datafusion_common::Result; #[test] @@ -656,75 +484,4 @@ mod tests { Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); generic_test_op!(a, DataType::Float64, Sum, ScalarValue::from(15_f64)) } - - fn row_aggregate( - array: &ArrayRef, - agg: Arc, - row_accessor: &mut RowAccessor, - row_indexs: Vec, - ) -> Result { - let mut accum = agg.create_row_accumulator(0)?; - - for row_index in row_indexs { - let scalar_value = ScalarValue::try_from_array(array, row_index)?; - accum.update_scalar(&scalar_value, row_accessor)?; - } - accum.evaluate(row_accessor) - } - - #[test] - fn sum_dictionary_f64() -> Result<()> { - let keys = Int32Array::from(vec![2, 3, 1, 0, 1]); - let values = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64])); - - let a: ArrayRef = Arc::new(DictionaryArray::try_new(keys, values).unwrap()); - - let row_schema = Schema::new(vec![Field::new("a", DataType::Float64, true)]); - let mut row_accessor = RowAccessor::new(&row_schema); - let mut buffer: Vec = vec![0; 16]; - row_accessor.point_to(0, &mut buffer); - - let expected = ScalarValue::from(9_f64); - - let agg = Arc::new(Sum::new( - col("a", &row_schema)?, - "bla".to_string(), - expected.get_datatype(), - )); - - let actual = row_aggregate(&a, agg, &mut row_accessor, vec![0, 1, 2])?; - assert_eq!(expected, actual); - - Ok(()) - } - - #[test] - fn avg_dictionary_f64() -> Result<()> { - let keys = Int32Array::from(vec![2, 1, 1, 3, 0]); - let values = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64])); - - let a: ArrayRef = Arc::new(DictionaryArray::try_new(keys, values).unwrap()); - - let row_schema = Schema::new(vec![ - Field::new("count", DataType::UInt64, true), - Field::new("a", DataType::Float64, true), - ]); - let mut row_accessor = RowAccessor::new(&row_schema); - let mut buffer: Vec = vec![0; 24]; - row_accessor.point_to(0, &mut buffer); - - let expected = ScalarValue::from(2.3333333333333335_f64); - - let schema = Schema::new(vec![Field::new("a", DataType::Float64, true)]); - let agg = Arc::new(Avg::new( - col("a", &schema)?, - "bla".to_string(), - expected.get_datatype(), - )); - - let actual = row_aggregate(&a, agg, &mut row_accessor, vec![0, 1, 2])?; - assert_eq!(expected, actual); - - Ok(()) - } } From 22370ea946eaccab1dc45d0b357cf5ee5817589d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jul 2023 15:41:00 -0400 Subject: [PATCH 2/4] Remove `datafusion-row` --- Cargo.toml | 2 +- datafusion/core/Cargo.toml | 1 - datafusion/core/src/lib.rs | 5 - datafusion/core/tests/row.rs | 97 ------- datafusion/physical-expr/Cargo.toml | 1 - datafusion/row/Cargo.toml | 39 --- datafusion/row/README.md | 29 --- datafusion/row/src/accessor.rs | 384 ---------------------------- datafusion/row/src/layout.rs | 157 ------------ datafusion/row/src/lib.rs | 303 ---------------------- datafusion/row/src/reader.rs | 366 -------------------------- datafusion/row/src/validity.rs | 161 ------------ datafusion/row/src/writer.rs | 333 ------------------------ 13 files changed, 1 insertion(+), 1877 deletions(-) delete mode 100644 datafusion/core/tests/row.rs delete mode 100644 datafusion/row/Cargo.toml delete mode 100644 datafusion/row/README.md delete mode 100644 datafusion/row/src/accessor.rs delete mode 100644 datafusion/row/src/layout.rs delete mode 100644 datafusion/row/src/lib.rs delete mode 100644 datafusion/row/src/reader.rs delete mode 100644 datafusion/row/src/validity.rs delete mode 100644 datafusion/row/src/writer.rs diff --git a/Cargo.toml b/Cargo.toml index 2db0379b0657..7029c2869d9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [workspace] exclude = ["datafusion-cli"] -members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/proto/gen", "datafusion/row", "datafusion/sql", "datafusion/substrait", "datafusion-examples", "test-utils", "benchmarks", +members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/proto/gen", "datafusion/sql", "datafusion/substrait", "datafusion-examples", "test-utils", "benchmarks", ] resolver = "2" diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 862d41a889f3..a95a8266052f 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -67,7 +67,6 @@ datafusion-execution = { path = "../execution", version = "27.0.0" } datafusion-expr = { path = "../expr", version = "27.0.0" } datafusion-optimizer = { path = "../optimizer", version = "27.0.0", default-features = false } datafusion-physical-expr = { path = "../physical-expr", version = "27.0.0", default-features = false } -datafusion-row = { path = "../row", version = "27.0.0" } datafusion-sql = { path = "../sql", version = "27.0.0" } flate2 = { version = "1.0.24", optional = true } futures = "0.3" diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 0b431cc1018d..8a7e39301a9d 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -468,11 +468,6 @@ pub mod physical_expr { pub use datafusion_physical_expr::*; } -/// re-export of [`datafusion_row`] crate -pub mod row { - pub use datafusion_row::*; -} - /// re-export of [`datafusion_sql`] crate pub mod sql { pub use datafusion_sql::*; diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs deleted file mode 100644 index c68b422a4f06..000000000000 --- a/datafusion/core/tests/row.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::file_format::FileFormat; -use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::FileScanConfig; -use datafusion::error::Result; -use datafusion::execution::context::SessionState; -use datafusion::physical_plan::{collect, ExecutionPlan}; -use datafusion::prelude::SessionContext; -use datafusion_row::reader::read_as_batch; -use datafusion_row::writer::write_batch_unchecked; -use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; -use std::sync::Arc; - -#[tokio::test] -async fn test_with_parquet() -> Result<()> { - let ctx = SessionContext::new(); - let state = ctx.state(); - let task_ctx = state.task_ctx(); - let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]); - let exec = - get_exec(&state, "alltypes_plain.parquet", projection.as_ref(), None).await?; - let schema = exec.schema().clone(); - - let batches = collect(exec, task_ctx).await?; - assert_eq!(1, batches.len()); - let batch = &batches[0]; - - let mut vector = vec![0; 20480]; - let row_offsets = { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; - assert_eq!(*batch, output_batch); - - Ok(()) -} - -async fn get_exec( - state: &SessionState, - file_name: &str, - projection: Option<&Vec>, - limit: Option, -) -> Result> { - let testdata = datafusion::test_util::parquet_test_data(); - let filename = format!("{testdata}/{file_name}"); - - let path = Path::from_filesystem_path(filename).unwrap(); - - let format = ParquetFormat::default(); - let object_store = Arc::new(LocalFileSystem::new()) as Arc; - let object_store_url = ObjectStoreUrl::local_filesystem(); - - let meta = object_store.head(&path).await.unwrap(); - - let file_schema = format - .infer_schema(state, &object_store, &[meta.clone()]) - .await - .expect("Schema inference"); - let statistics = format - .infer_stats(state, &object_store, file_schema.clone(), &meta) - .await - .expect("Stats inference"); - let file_groups = vec![vec![meta.into()]]; - let exec = format - .create_physical_plan( - state, - FileScanConfig { - object_store_url, - file_schema, - file_groups, - statistics, - projection: projection.cloned(), - limit, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - ) - .await?; - Ok(exec) -} diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index b7ffa1810cce..02958a543c37 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -54,7 +54,6 @@ blake3 = { version = "1.0", optional = true } chrono = { version = "0.4.23", default-features = false } datafusion-common = { path = "../common", version = "27.0.0" } datafusion-expr = { path = "../expr", version = "27.0.0" } -datafusion-row = { path = "../row", version = "27.0.0" } half = { version = "2.1", default-features = false } hashbrown = { version = "0.14", features = ["raw"] } hex = { version = "0.4", optional = true } diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml deleted file mode 100644 index 4d34a5da2fd1..000000000000 --- a/datafusion/row/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "datafusion-row" -description = "Row backed by raw bytes for DataFusion query engine" -keywords = [ "arrow", "query", "sql" ] -version = { workspace = true } -edition = { workspace = true } -readme = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -license = { workspace = true } -authors = { workspace = true } -rust-version = { workspace = true } - -[lib] -name = "datafusion_row" -path = "src/lib.rs" - -[dependencies] -arrow = { workspace = true } -datafusion-common = { path = "../common", version = "27.0.0" } -paste = "^1.0" -rand = "0.8" diff --git a/datafusion/row/README.md b/datafusion/row/README.md deleted file mode 100644 index eef4dfd554e3..000000000000 --- a/datafusion/row/README.md +++ /dev/null @@ -1,29 +0,0 @@ - - -# DataFusion Row - -[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. - -This crate is a submodule of DataFusion that provides an optimized row based format for row-based operations. - -See the documentation in [`lib.rs`] for more details. - -[df]: https://crates.io/crates/datafusion -[`lib.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion/row/src/lib.rs diff --git a/datafusion/row/src/accessor.rs b/datafusion/row/src/accessor.rs deleted file mode 100644 index a0b5a70df993..000000000000 --- a/datafusion/row/src/accessor.rs +++ /dev/null @@ -1,384 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [`RowAccessor`] provides a Read/Write/Modify access for row with all fixed-sized fields: - -use crate::layout::RowLayout; -use crate::validity::NullBitsFormatter; -use crate::{fn_get_idx, fn_get_idx_opt, fn_set_idx}; -use arrow::datatypes::{DataType, Schema}; -use arrow::util::bit_util::{get_bit_raw, set_bit_raw}; -use datafusion_common::ScalarValue; -use std::ops::{BitAnd, BitOr, BitXor}; -use std::sync::Arc; - -//TODO: DRY with reader and writer - -/// Provides read/write/modify access to a tuple stored in Row format -/// at `data[base_offset..]` -/// -/// ```text -/// Set / Update data -/// in [u8] -/// ─ ─ ─ ─ ─ ─ ─ ┐ Read data out as native -/// │ types or ScalarValues -/// │ -/// │ ┌───────────────────────┐ -/// │ │ -/// └ ▶│ [u8] │─ ─ ─ ─ ─ ─ ─ ─▶ -/// │ │ -/// └───────────────────────┘ -/// ``` -pub struct RowAccessor<'a> { - /// Layout on how to read each field - layout: Arc, - /// Raw bytes slice where the tuple stores - data: &'a mut [u8], - /// Start position for the current tuple in the raw bytes slice. - base_offset: usize, -} - -impl<'a> std::fmt::Debug for RowAccessor<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if self.null_free() { - write!(f, "null_free") - } else { - let null_bits = self.null_bits(); - write!( - f, - "{:?}", - NullBitsFormatter::new(null_bits, self.layout.field_count) - ) - } - } -} - -#[macro_export] -macro_rules! fn_add_idx { - ($NATIVE: ident) => { - paste::item! { - /// add field at `idx` with `value` - #[inline(always)] - pub fn [](&mut self, idx: usize, value: $NATIVE) { - if self.is_valid_at(idx) { - self.[](idx, value + self.[](idx)); - } else { - self.set_non_null_at(idx); - self.[](idx, value); - } - } - } - }; -} - -macro_rules! fn_max_min_idx { - ($NATIVE: ident, $OP: ident) => { - paste::item! { - /// check max then update - #[inline(always)] - pub fn [<$OP _ $NATIVE>](&mut self, idx: usize, value: $NATIVE) { - if self.is_valid_at(idx) { - let v = value.$OP(self.[](idx)); - self.[](idx, v); - } else { - self.set_non_null_at(idx); - self.[](idx, value); - } - } - } - }; -} - -macro_rules! fn_bit_and_or_xor_idx { - ($NATIVE: ident, $OP: ident) => { - paste::item! { - /// check bit_and then update - #[inline(always)] - pub fn [<$OP _ $NATIVE>](&mut self, idx: usize, value: $NATIVE) { - if self.is_valid_at(idx) { - let v = value.$OP(self.[](idx)); - self.[](idx, v); - } else { - self.set_non_null_at(idx); - self.[](idx, value); - } - } - } - }; -} - -macro_rules! fn_get_idx_scalar { - ($NATIVE: ident, $SCALAR:ident) => { - paste::item! { - #[inline(always)] - pub fn [](&self, idx: usize) -> ScalarValue { - if self.is_valid_at(idx) { - ScalarValue::$SCALAR(Some(self.[](idx))) - } else { - ScalarValue::$SCALAR(None) - } - } - } - }; -} - -impl<'a> RowAccessor<'a> { - /// new - pub fn new(schema: &Schema) -> Self { - Self { - layout: Arc::new(RowLayout::new(schema)), - data: &mut [], - base_offset: 0, - } - } - - pub fn new_from_layout(layout: Arc) -> Self { - Self { - layout, - data: &mut [], - base_offset: 0, - } - } - - /// Update this row to point to position `offset` in `base` - pub fn point_to(&mut self, offset: usize, data: &'a mut [u8]) { - self.base_offset = offset; - self.data = data; - } - - #[inline] - fn assert_index_valid(&self, idx: usize) { - assert!(idx < self.layout.field_count); - } - - #[inline(always)] - fn field_offsets(&self) -> &[usize] { - &self.layout.field_offsets - } - - #[inline(always)] - fn null_free(&self) -> bool { - self.layout.null_free - } - - #[inline(always)] - fn null_bits(&self) -> &[u8] { - if self.null_free() { - &[] - } else { - let start = self.base_offset; - &self.data[start..start + self.layout.null_width] - } - } - - fn is_valid_at(&self, idx: usize) -> bool { - unsafe { get_bit_raw(self.null_bits().as_ptr(), idx) } - } - - // ------------------------------ - // ----- Fixed Sized getters ---- - // ------------------------------ - - fn get_bool(&self, idx: usize) -> bool { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - let value = &self.data[self.base_offset + offset..]; - value[0] != 0 - } - - fn get_u8(&self, idx: usize) -> u8 { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[self.base_offset + offset] - } - - fn_get_idx!(u16, 2); - fn_get_idx!(u32, 4); - fn_get_idx!(u64, 8); - fn_get_idx!(i8, 1); - fn_get_idx!(i16, 2); - fn_get_idx!(i32, 4); - fn_get_idx!(i64, 8); - fn_get_idx!(f32, 4); - fn_get_idx!(f64, 8); - fn_get_idx!(i128, 16); - - fn_get_idx_opt!(bool); - fn_get_idx_opt!(u8); - fn_get_idx_opt!(u16); - fn_get_idx_opt!(u32); - fn_get_idx_opt!(u64); - fn_get_idx_opt!(i8); - fn_get_idx_opt!(i16); - fn_get_idx_opt!(i32); - fn_get_idx_opt!(i64); - fn_get_idx_opt!(f32); - fn_get_idx_opt!(f64); - fn_get_idx_opt!(i128); - - fn_get_idx_scalar!(bool, Boolean); - fn_get_idx_scalar!(u8, UInt8); - fn_get_idx_scalar!(u16, UInt16); - fn_get_idx_scalar!(u32, UInt32); - fn_get_idx_scalar!(u64, UInt64); - fn_get_idx_scalar!(i8, Int8); - fn_get_idx_scalar!(i16, Int16); - fn_get_idx_scalar!(i32, Int32); - fn_get_idx_scalar!(i64, Int64); - fn_get_idx_scalar!(f32, Float32); - fn_get_idx_scalar!(f64, Float64); - - fn get_decimal128_scalar(&self, idx: usize, p: u8, s: i8) -> ScalarValue { - if self.is_valid_at(idx) { - ScalarValue::Decimal128(Some(self.get_i128(idx)), p, s) - } else { - ScalarValue::Decimal128(None, p, s) - } - } - - pub fn get_as_scalar(&self, dt: &DataType, index: usize) -> ScalarValue { - match dt { - DataType::Boolean => self.get_bool_scalar(index), - DataType::Int8 => self.get_i8_scalar(index), - DataType::Int16 => self.get_i16_scalar(index), - DataType::Int32 => self.get_i32_scalar(index), - DataType::Int64 => self.get_i64_scalar(index), - DataType::UInt8 => self.get_u8_scalar(index), - DataType::UInt16 => self.get_u16_scalar(index), - DataType::UInt32 => self.get_u32_scalar(index), - DataType::UInt64 => self.get_u64_scalar(index), - DataType::Float32 => self.get_f32_scalar(index), - DataType::Float64 => self.get_f64_scalar(index), - DataType::Decimal128(p, s) => self.get_decimal128_scalar(index, *p, *s), - _ => unreachable!(), - } - } - - // ------------------------------ - // ----- Fixed Sized setters ---- - // ------------------------------ - - pub(crate) fn set_non_null_at(&mut self, idx: usize) { - assert!( - !self.null_free(), - "Unexpected call to set_non_null_at on null-free row writer" - ); - let null_bits = &mut self.data[0..self.layout.null_width]; - unsafe { - set_bit_raw(null_bits.as_mut_ptr(), idx); - } - } - - fn set_bool(&mut self, idx: usize, value: bool) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = u8::from(value); - } - - fn set_u8(&mut self, idx: usize, value: u8) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = value; - } - - fn_set_idx!(u16, 2); - fn_set_idx!(u32, 4); - fn_set_idx!(u64, 8); - fn_set_idx!(i16, 2); - fn_set_idx!(i32, 4); - fn_set_idx!(i64, 8); - fn_set_idx!(f32, 4); - fn_set_idx!(f64, 8); - fn_set_idx!(i128, 16); - - fn set_i8(&mut self, idx: usize, value: i8) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = value.to_le_bytes()[0]; - } - - // ------------------------------ - // ---- Fixed sized updaters ---- - // ------------------------------ - - fn_add_idx!(u8); - fn_add_idx!(u16); - fn_add_idx!(u32); - fn_add_idx!(u64); - fn_add_idx!(i8); - fn_add_idx!(i16); - fn_add_idx!(i32); - fn_add_idx!(i64); - fn_add_idx!(f32); - fn_add_idx!(f64); - fn_add_idx!(i128); - - fn_max_min_idx!(bool, max); - fn_max_min_idx!(u8, max); - fn_max_min_idx!(u16, max); - fn_max_min_idx!(u32, max); - fn_max_min_idx!(u64, max); - fn_max_min_idx!(i8, max); - fn_max_min_idx!(i16, max); - fn_max_min_idx!(i32, max); - fn_max_min_idx!(i64, max); - fn_max_min_idx!(f32, max); - fn_max_min_idx!(f64, max); - fn_max_min_idx!(i128, max); - - fn_max_min_idx!(bool, min); - fn_max_min_idx!(u8, min); - fn_max_min_idx!(u16, min); - fn_max_min_idx!(u32, min); - fn_max_min_idx!(u64, min); - fn_max_min_idx!(i8, min); - fn_max_min_idx!(i16, min); - fn_max_min_idx!(i32, min); - fn_max_min_idx!(i64, min); - fn_max_min_idx!(f32, min); - fn_max_min_idx!(f64, min); - fn_max_min_idx!(i128, min); - - fn_bit_and_or_xor_idx!(bool, bitand); - fn_bit_and_or_xor_idx!(u8, bitand); - fn_bit_and_or_xor_idx!(u16, bitand); - fn_bit_and_or_xor_idx!(u32, bitand); - fn_bit_and_or_xor_idx!(u64, bitand); - fn_bit_and_or_xor_idx!(i8, bitand); - fn_bit_and_or_xor_idx!(i16, bitand); - fn_bit_and_or_xor_idx!(i32, bitand); - fn_bit_and_or_xor_idx!(i64, bitand); - - fn_bit_and_or_xor_idx!(bool, bitor); - fn_bit_and_or_xor_idx!(u8, bitor); - fn_bit_and_or_xor_idx!(u16, bitor); - fn_bit_and_or_xor_idx!(u32, bitor); - fn_bit_and_or_xor_idx!(u64, bitor); - fn_bit_and_or_xor_idx!(i8, bitor); - fn_bit_and_or_xor_idx!(i16, bitor); - fn_bit_and_or_xor_idx!(i32, bitor); - fn_bit_and_or_xor_idx!(i64, bitor); - - fn_bit_and_or_xor_idx!(u8, bitxor); - fn_bit_and_or_xor_idx!(u16, bitxor); - fn_bit_and_or_xor_idx!(u32, bitxor); - fn_bit_and_or_xor_idx!(u64, bitxor); - fn_bit_and_or_xor_idx!(i8, bitxor); - fn_bit_and_or_xor_idx!(i16, bitxor); - fn_bit_and_or_xor_idx!(i32, bitxor); - fn_bit_and_or_xor_idx!(i64, bitxor); -} diff --git a/datafusion/row/src/layout.rs b/datafusion/row/src/layout.rs deleted file mode 100644 index 71471327536a..000000000000 --- a/datafusion/row/src/layout.rs +++ /dev/null @@ -1,157 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Various row layouts for different use case - -use crate::schema_null_free; -use arrow::datatypes::{DataType, Schema}; -use arrow::util::bit_util::{ceil, round_upto_power_of_2}; - -/// Row layout stores one or multiple 8-byte word(s) per field for CPU-friendly -/// and efficient processing. -/// -/// It is mainly used to represent the rows with frequently updated content, -/// for example, grouping state for hash aggregation. -/// -/// Each tuple consists of two parts: "`null bit set`" and "`values`". -/// -/// For null-free tuples, the null bit set can be omitted. -/// -/// The null bit set, when present, is aligned to 8 bytes. It stores one bit per field. -/// -/// In the region of the values, we store the fields in the order they are defined in the schema. -/// Each field is stored in one or multiple 8-byte words. -/// -/// ```plaintext -/// ┌─────────────────┬─────────────────────┐ -/// │Validity Bitmask │ Fields │ -/// │ (8-byte aligned)│ (8-byte words) │ -/// └─────────────────┴─────────────────────┘ -/// ``` -/// -/// For example, given the schema (Int8, Float32, Int64) with a null-free tuple -/// -/// Encoding the tuple (1, 3.14, 42) -/// -/// Requires 24 bytes (3 fields * 8 bytes each): -/// -/// ```plaintext -/// ┌──────────────────────┬──────────────────────┬──────────────────────┐ -/// │ 0x01 │ 0x4048F5C3 │ 0x0000002A │ -/// └──────────────────────┴──────────────────────┴──────────────────────┘ -/// 0 8 16 24 -/// ``` -/// -/// If the schema allows null values and the tuple is (1, NULL, 42) -/// -/// Encoding the tuple requires 32 bytes (1 * 8 bytes for the null bit set + 3 fields * 8 bytes each): -/// -/// ```plaintext -/// ┌──────────────────────────┬──────────────────────┬──────────────────────┬──────────────────────┐ -/// │ 0b00000101 │ 0x01 │ 0x00000000 │ 0x0000002A │ -/// │ (7 bytes padding after) │ │ │ │ -/// └──────────────────────────┴──────────────────────┴──────────────────────┴──────────────────────┘ -/// 0 8 16 24 32 -/// ``` -#[derive(Debug, Clone)] -pub struct RowLayout { - /// If a row is null free according to its schema - pub(crate) null_free: bool, - /// The number of bytes used to store null bits for each field. - pub(crate) null_width: usize, - /// Length in bytes for `values` part of the current tuple. - pub(crate) values_width: usize, - /// Total number of fields for each tuple. - pub(crate) field_count: usize, - /// Starting offset for each fields in the raw bytes. - pub(crate) field_offsets: Vec, -} - -impl RowLayout { - /// new - pub fn new(schema: &Schema) -> Self { - assert!( - row_supported(schema), - "Row with {schema:?} not supported yet.", - ); - let null_free = schema_null_free(schema); - let field_count = schema.fields().len(); - let null_width = if null_free { - 0 - } else { - round_upto_power_of_2(ceil(field_count, 8), 8) - }; - let (field_offsets, values_width) = word_aligned_offsets(null_width, schema); - Self { - null_free, - null_width, - values_width, - field_count, - field_offsets, - } - } - - /// Get fixed part width for this layout - #[inline(always)] - pub fn fixed_part_width(&self) -> usize { - self.null_width + self.values_width - } -} - -fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec, usize) { - let mut offsets = vec![]; - let mut offset = null_width; - for f in schema.fields() { - offsets.push(offset); - assert!(!matches!(f.data_type(), DataType::Decimal256(_, _))); - // All of the current support types can fit into one single 8-bytes word except for Decimal128. - // For Decimal128, its width is of two 8-bytes words. - match f.data_type() { - DataType::Decimal128(_, _) => offset += 16, - _ => offset += 8, - } - } - (offsets, offset - null_width) -} - -/// Return true of data in `schema` can be converted to raw-bytes -/// based rows. -/// -/// Note all schemas can be supported in the row format -pub fn row_supported(schema: &Schema) -> bool { - schema.fields().iter().all(|f| { - let dt = f.data_type(); - use DataType::*; - matches!( - dt, - Boolean - | UInt8 - | UInt16 - | UInt32 - | UInt64 - | Int8 - | Int16 - | Int32 - | Int64 - | Float32 - | Float64 - | Date32 - | Date64 - | Decimal128(_, _) - ) - }) -} diff --git a/datafusion/row/src/lib.rs b/datafusion/row/src/lib.rs deleted file mode 100644 index 902fa881b19b..000000000000 --- a/datafusion/row/src/lib.rs +++ /dev/null @@ -1,303 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This module contains code to translate arrays back and forth to a -//! row based format. The row based format is backed by raw bytes -//! ([`[u8]`]) and used to optimize certain operations. -//! -//! In general, DataFusion is a so called "vectorized" execution -//! model, specifically it uses the optimized calculation kernels in -//! [`arrow`] to amortize dispatch overhead. -//! -//! However, as mentioned in [this paper], there are some "row -//! oriented" operations in a database that are not typically amenable -//! to vectorization. The "classics" are: hash table updates in joins -//! and hash aggregates, as well as comparing tuples in sort / -//! merging. -//! -//! [this paper]: https://db.in.tum.de/~kersten/vectorization_vs_compilation.pdf - -use arrow::array::{make_builder, ArrayBuilder, ArrayRef}; -use arrow::datatypes::Schema; -use arrow::error::Result as ArrowResult; -use arrow::record_batch::RecordBatch; -pub use layout::row_supported; -use std::sync::Arc; - -pub mod accessor; -pub mod layout; -pub mod reader; -mod validity; -pub mod writer; - -/// Tell if schema contains no nullable field -pub(crate) fn schema_null_free(schema: &Schema) -> bool { - schema.fields().iter().all(|f| !f.is_nullable()) -} - -/// Columnar Batch buffer that assists creating `RecordBatches` -pub struct MutableRecordBatch { - arrays: Vec>, - schema: Arc, -} - -impl MutableRecordBatch { - /// new - pub fn new(target_batch_size: usize, schema: Arc) -> Self { - let arrays = new_arrays(&schema, target_batch_size); - Self { arrays, schema } - } - - /// Finalize the batch, output and reset this buffer - pub fn output(&mut self) -> ArrowResult { - let result = make_batch(self.schema.clone(), self.arrays.drain(..).collect()); - result - } - - pub fn output_as_columns(&mut self) -> Vec { - get_columns(self.arrays.drain(..).collect()) - } -} - -fn new_arrays(schema: &Schema, batch_size: usize) -> Vec> { - schema - .fields() - .iter() - .map(|field| { - let dt = field.data_type(); - make_builder(dt, batch_size) - }) - .collect::>() -} - -fn make_batch( - schema: Arc, - mut arrays: Vec>, -) -> ArrowResult { - let columns = arrays.iter_mut().map(|array| array.finish()).collect(); - RecordBatch::try_new(schema, columns) -} - -fn get_columns(mut arrays: Vec>) -> Vec { - arrays.iter_mut().map(|array| array.finish()).collect() -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::layout::RowLayout; - use crate::reader::read_as_batch; - use crate::writer::write_batch_unchecked; - use arrow::record_batch::RecordBatch; - use arrow::{array::*, datatypes::*}; - use datafusion_common::Result; - use DataType::*; - - macro_rules! fn_test_single_type { - ($ARRAY: ident, $TYPE: expr, $VEC: expr) => { - paste::item! { - #[test] - #[allow(non_snake_case)] - fn []() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)])); - let record_width = RowLayout::new(schema.as_ref()).fixed_part_width(); - let a = $ARRAY::from($VEC); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; record_width * batch.num_rows()]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - #[allow(non_snake_case)] - fn []() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); - let record_width = RowLayout::new(schema.as_ref()).fixed_part_width(); - let v = $VEC.into_iter().filter(|o| o.is_some()).collect::>(); - let a = $ARRAY::from(v); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; record_width * batch.num_rows()]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - } - }; - } - - fn_test_single_type!( - BooleanArray, - Boolean, - vec![Some(true), Some(false), None, Some(true), None] - ); - - fn_test_single_type!( - Int8Array, - Int8, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Int16Array, - Int16, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Int32Array, - Int32, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Int64Array, - Int64, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt8Array, - UInt8, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt16Array, - UInt16, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt32Array, - UInt32, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt64Array, - UInt64, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Float32Array, - Float32, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] - ); - - fn_test_single_type!( - Float64Array, - Float64, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] - ); - - fn_test_single_type!( - Date32Array, - Date32, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Date64Array, - Date64, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - #[test] - fn test_single_decimal128() -> Result<()> { - let v = vec![ - Some(0), - Some(1), - None, - Some(-1), - Some(i128::MIN), - Some(i128::MAX), - ]; - let schema = - Arc::new(Schema::new(vec![Field::new("a", Decimal128(38, 10), true)])); - let record_width = RowLayout::new(schema.as_ref()).fixed_part_width(); - let a = Decimal128Array::from(v); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; record_width * batch.num_rows()]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - fn test_single_decimal128_null_free() -> Result<()> { - let v = vec![ - Some(0), - Some(1), - None, - Some(-1), - Some(i128::MIN), - Some(i128::MAX), - ]; - let schema = Arc::new(Schema::new(vec![Field::new( - "a", - Decimal128(38, 10), - false, - )])); - let record_width = RowLayout::new(schema.as_ref()).fixed_part_width(); - let v = v.into_iter().filter(|o| o.is_some()).collect::>(); - let a = Decimal128Array::from(v); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; record_width * batch.num_rows()]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - #[should_panic(expected = "not supported yet")] - fn test_unsupported_type() { - let a: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world"])); - let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); - let schema = batch.schema(); - let mut vector = vec![0; 1024]; - write_batch_unchecked(&mut vector, 0, &batch, 0, schema); - } - - #[test] - #[should_panic(expected = "not supported yet")] - fn test_unsupported_type_write() { - let a: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); - let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); - let schema = batch.schema(); - let mut vector = vec![0; 1024]; - write_batch_unchecked(&mut vector, 0, &batch, 0, schema); - } - - #[test] - #[should_panic(expected = "not supported yet")] - fn test_unsupported_type_read() { - let schema = Arc::new(Schema::new(vec![Field::new("a", Utf8, false)])); - let vector = vec![0; 1024]; - let row_offsets = vec![0]; - read_as_batch(&vector, schema, &row_offsets).unwrap(); - } -} diff --git a/datafusion/row/src/reader.rs b/datafusion/row/src/reader.rs deleted file mode 100644 index 10c9896df70a..000000000000 --- a/datafusion/row/src/reader.rs +++ /dev/null @@ -1,366 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [`read_as_batch`] converts raw bytes to [`RecordBatch`] - -use crate::layout::RowLayout; -use crate::validity::{all_valid, NullBitsFormatter}; -use crate::MutableRecordBatch; -use arrow::array::*; -use arrow::datatypes::{DataType, Schema}; -use arrow::record_batch::RecordBatch; -use arrow::util::bit_util::get_bit_raw; -use datafusion_common::{DataFusionError, Result}; -use std::sync::Arc; - -/// Read raw-bytes from `data` rows starting at `offsets` out to a [`RecordBatch`] -/// -/// -/// ```text -/// Read data to RecordBatch ┌──────────────────┐ -/// │ │ -/// │ │ -/// ┌───────────────────────┐ │ │ -/// │ │ │ RecordBatch │ -/// │ [u8] │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶│ │ -/// │ │ │ (... N Rows ...) │ -/// └───────────────────────┘ │ │ -/// │ │ -/// │ │ -/// └──────────────────┘ -/// ``` -pub fn read_as_batch( - data: &[u8], - schema: Arc, - offsets: &[usize], -) -> Result { - let row_num = offsets.len(); - let mut output = MutableRecordBatch::new(row_num, schema.clone()); - let mut row = RowReader::new(&schema); - - for offset in offsets.iter().take(row_num) { - row.point_to(*offset, data); - read_row(&row, &mut output, &schema); - } - - output.output().map_err(DataFusionError::ArrowError) -} - -#[macro_export] -macro_rules! get_idx { - ($NATIVE: ident, $SELF: ident, $IDX: ident, $WIDTH: literal) => {{ - $SELF.assert_index_valid($IDX); - let offset = $SELF.field_offsets()[$IDX]; - let start = $SELF.base_offset + offset; - let end = start + $WIDTH; - $NATIVE::from_le_bytes($SELF.data[start..end].try_into().unwrap()) - }}; -} - -#[macro_export] -macro_rules! fn_get_idx { - ($NATIVE: ident, $WIDTH: literal) => { - paste::item! { - fn [](&self, idx: usize) -> $NATIVE { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - let start = self.base_offset + offset; - let end = start + $WIDTH; - $NATIVE::from_le_bytes(self.data[start..end].try_into().unwrap()) - } - } - }; -} - -#[macro_export] -macro_rules! fn_get_idx_opt { - ($NATIVE: ident) => { - paste::item! { - pub fn [](&self, idx: usize) -> Option<$NATIVE> { - if self.is_valid_at(idx) { - Some(self.[](idx)) - } else { - None - } - } - } - }; -} - -/// Read the tuple `data[base_offset..]` we are currently pointing to -pub struct RowReader<'a> { - /// Layout on how to read each field - layout: RowLayout, - /// Raw bytes slice where the tuple stores - data: &'a [u8], - /// Start position for the current tuple in the raw bytes slice. - base_offset: usize, -} - -impl<'a> std::fmt::Debug for RowReader<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if self.null_free() { - write!(f, "null_free") - } else { - let null_bits = self.null_bits(); - write!( - f, - "{:?}", - NullBitsFormatter::new(null_bits, self.layout.field_count) - ) - } - } -} - -impl<'a> RowReader<'a> { - /// new - pub fn new(schema: &Schema) -> Self { - Self { - layout: RowLayout::new(schema), - data: &[], - base_offset: 0, - } - } - - /// Update this row to point to position `offset` in `base` - pub fn point_to(&mut self, offset: usize, data: &'a [u8]) { - self.base_offset = offset; - self.data = data; - } - - #[inline] - fn assert_index_valid(&self, idx: usize) { - assert!(idx < self.layout.field_count); - } - - #[inline(always)] - fn field_offsets(&self) -> &[usize] { - &self.layout.field_offsets - } - - #[inline(always)] - fn null_free(&self) -> bool { - self.layout.null_free - } - - #[inline(always)] - fn null_bits(&self) -> &[u8] { - if self.null_free() { - &[] - } else { - let start = self.base_offset; - &self.data[start..start + self.layout.null_width] - } - } - - #[inline(always)] - fn all_valid(&self) -> bool { - if self.null_free() { - true - } else { - let null_bits = self.null_bits(); - all_valid(null_bits, self.layout.field_count) - } - } - - fn is_valid_at(&self, idx: usize) -> bool { - unsafe { get_bit_raw(self.null_bits().as_ptr(), idx) } - } - - fn get_bool(&self, idx: usize) -> bool { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - let value = &self.data[self.base_offset + offset..]; - value[0] != 0 - } - - fn get_u8(&self, idx: usize) -> u8 { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[self.base_offset + offset] - } - - fn_get_idx!(u16, 2); - fn_get_idx!(u32, 4); - fn_get_idx!(u64, 8); - fn_get_idx!(i8, 1); - fn_get_idx!(i16, 2); - fn_get_idx!(i32, 4); - fn_get_idx!(i64, 8); - fn_get_idx!(f32, 4); - fn_get_idx!(f64, 8); - - fn get_date32(&self, idx: usize) -> i32 { - get_idx!(i32, self, idx, 4) - } - - fn get_date64(&self, idx: usize) -> i64 { - get_idx!(i64, self, idx, 8) - } - - fn get_decimal128(&self, idx: usize) -> i128 { - get_idx!(i128, self, idx, 16) - } - - fn_get_idx_opt!(bool); - fn_get_idx_opt!(u8); - fn_get_idx_opt!(u16); - fn_get_idx_opt!(u32); - fn_get_idx_opt!(u64); - fn_get_idx_opt!(i8); - fn_get_idx_opt!(i16); - fn_get_idx_opt!(i32); - fn_get_idx_opt!(i64); - fn_get_idx_opt!(f32); - fn_get_idx_opt!(f64); - - fn get_date32_opt(&self, idx: usize) -> Option { - if self.is_valid_at(idx) { - Some(self.get_date32(idx)) - } else { - None - } - } - - fn get_date64_opt(&self, idx: usize) -> Option { - if self.is_valid_at(idx) { - Some(self.get_date64(idx)) - } else { - None - } - } - - fn get_decimal128_opt(&self, idx: usize) -> Option { - if self.is_valid_at(idx) { - Some(self.get_decimal128(idx)) - } else { - None - } - } -} - -/// Read the row currently pointed by RowWriter to the output columnar batch buffer -pub fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Schema) { - if row.all_valid() { - for ((col_idx, to), field) in batch - .arrays - .iter_mut() - .enumerate() - .zip(schema.fields().iter()) - { - read_field_null_free(to, field.data_type(), col_idx, row) - } - } else { - for ((col_idx, to), field) in batch - .arrays - .iter_mut() - .enumerate() - .zip(schema.fields().iter()) - { - read_field(to, field.data_type(), col_idx, row) - } - } -} - -macro_rules! fn_read_field { - ($NATIVE: ident, $ARRAY: ident) => { - paste::item! { - pub(crate) fn [](to: &mut Box, col_idx: usize, row: &RowReader) { - let to = to - .as_any_mut() - .downcast_mut::<$ARRAY>() - .unwrap(); - to.append_option(row.[](col_idx)); - } - - pub(crate) fn [](to: &mut Box, col_idx: usize, row: &RowReader) { - let to = to - .as_any_mut() - .downcast_mut::<$ARRAY>() - .unwrap(); - to.append_value(row.[](col_idx)); - } - } - }; -} - -fn_read_field!(bool, BooleanBuilder); -fn_read_field!(u8, UInt8Builder); -fn_read_field!(u16, UInt16Builder); -fn_read_field!(u32, UInt32Builder); -fn_read_field!(u64, UInt64Builder); -fn_read_field!(i8, Int8Builder); -fn_read_field!(i16, Int16Builder); -fn_read_field!(i32, Int32Builder); -fn_read_field!(i64, Int64Builder); -fn_read_field!(f32, Float32Builder); -fn_read_field!(f64, Float64Builder); -fn_read_field!(date32, Date32Builder); -fn_read_field!(date64, Date64Builder); -fn_read_field!(decimal128, Decimal128Builder); - -fn read_field( - to: &mut Box, - dt: &DataType, - col_idx: usize, - row: &RowReader, -) { - use DataType::*; - match dt { - Boolean => read_field_bool(to, col_idx, row), - UInt8 => read_field_u8(to, col_idx, row), - UInt16 => read_field_u16(to, col_idx, row), - UInt32 => read_field_u32(to, col_idx, row), - UInt64 => read_field_u64(to, col_idx, row), - Int8 => read_field_i8(to, col_idx, row), - Int16 => read_field_i16(to, col_idx, row), - Int32 => read_field_i32(to, col_idx, row), - Int64 => read_field_i64(to, col_idx, row), - Float32 => read_field_f32(to, col_idx, row), - Float64 => read_field_f64(to, col_idx, row), - Date32 => read_field_date32(to, col_idx, row), - Date64 => read_field_date64(to, col_idx, row), - Decimal128(_, _) => read_field_decimal128(to, col_idx, row), - _ => unimplemented!(), - } -} - -fn read_field_null_free( - to: &mut Box, - dt: &DataType, - col_idx: usize, - row: &RowReader, -) { - use DataType::*; - match dt { - Boolean => read_field_bool_null_free(to, col_idx, row), - UInt8 => read_field_u8_null_free(to, col_idx, row), - UInt16 => read_field_u16_null_free(to, col_idx, row), - UInt32 => read_field_u32_null_free(to, col_idx, row), - UInt64 => read_field_u64_null_free(to, col_idx, row), - Int8 => read_field_i8_null_free(to, col_idx, row), - Int16 => read_field_i16_null_free(to, col_idx, row), - Int32 => read_field_i32_null_free(to, col_idx, row), - Int64 => read_field_i64_null_free(to, col_idx, row), - Float32 => read_field_f32_null_free(to, col_idx, row), - Float64 => read_field_f64_null_free(to, col_idx, row), - Date32 => read_field_date32_null_free(to, col_idx, row), - Date64 => read_field_date64_null_free(to, col_idx, row), - Decimal128(_, _) => read_field_decimal128_null_free(to, col_idx, row), - _ => unimplemented!(), - } -} diff --git a/datafusion/row/src/validity.rs b/datafusion/row/src/validity.rs deleted file mode 100644 index 45f5e19f1894..000000000000 --- a/datafusion/row/src/validity.rs +++ /dev/null @@ -1,161 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Row format validity utilities - -use arrow::util::bit_util::get_bit_raw; -use std::fmt::Write; - -const ALL_VALID_MASK: [u8; 8] = [1, 3, 7, 15, 31, 63, 127, 255]; - -/// Returns if all fields are valid -pub fn all_valid(data: &[u8], n: usize) -> bool { - for item in data.iter().take(n / 8) { - if *item != ALL_VALID_MASK[7] { - return false; - } - } - if n % 8 == 0 { - true - } else { - data[n / 8] == ALL_VALID_MASK[n % 8 - 1] - } -} - -/// Show null bit for each field in a tuple, 1 for valid and 0 for null. -/// For a tuple with nine total fields, valid at field 0, 6, 7, 8 shows as `[10000011, 1]`. -pub struct NullBitsFormatter<'a> { - null_bits: &'a [u8], - field_count: usize, -} - -impl<'a> NullBitsFormatter<'a> { - /// new - pub fn new(null_bits: &'a [u8], field_count: usize) -> Self { - Self { - null_bits, - field_count, - } - } -} - -impl<'a> std::fmt::Debug for NullBitsFormatter<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut is_first = true; - let data = self.null_bits; - for i in 0..self.field_count { - if is_first { - f.write_char('[')?; - is_first = false; - } else if i % 8 == 0 { - f.write_str(", ")?; - } - if unsafe { get_bit_raw(data.as_ptr(), i) } { - f.write_char('1')?; - } else { - f.write_char('0')?; - } - } - f.write_char(']')?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use arrow::util::bit_util::{ceil, set_bit_raw, unset_bit_raw}; - use rand::Rng; - - fn test_validity(bs: &[bool]) { - let n = bs.len(); - let mut data = vec![0; ceil(n, 8)]; - for (i, b) in bs.iter().enumerate() { - if *b { - let data_argument = &mut data; - unsafe { - set_bit_raw(data_argument.as_mut_ptr(), i); - }; - } else { - let data_argument = &mut data; - unsafe { - unset_bit_raw(data_argument.as_mut_ptr(), i); - }; - } - } - let expected = bs.iter().all(|f| *f); - assert_eq!(all_valid(&data, bs.len()), expected); - } - - #[test] - fn test_all_valid() { - let sizes = [4, 8, 12, 16, 19, 23, 32, 44]; - for i in sizes { - { - // contains false - let input = { - let mut rng = rand::thread_rng(); - let mut input: Vec = vec![false; i]; - rng.fill(&mut input[..]); - input - }; - test_validity(&input); - } - - { - // all true - let input = vec![true; i]; - test_validity(&input); - } - } - } - - #[test] - fn test_formatter() -> std::fmt::Result { - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001], 8)), - "[10000011]" - ); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1], 9)), - "[10000011, 1]" - ); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 2)), "[10]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 3)), "[100]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 4)), "[1000]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 5)), "[10000]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 6)), "[100000]"); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[1], 7)), - "[1000000]" - ); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[1], 8)), - "[10000000]" - ); - // extra bytes are ignored - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1, 1, 1], 9)), - "[10000011, 1]" - ); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1, 1], 16)), - "[10000011, 10000000]" - ); - Ok(()) - } -} diff --git a/datafusion/row/src/writer.rs b/datafusion/row/src/writer.rs deleted file mode 100644 index 14ce6afe6832..000000000000 --- a/datafusion/row/src/writer.rs +++ /dev/null @@ -1,333 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [`RowWriter`] writes [`RecordBatch`]es to `Vec` to stitch attributes together - -use crate::layout::RowLayout; -use arrow::array::*; -use arrow::datatypes::{DataType, Schema}; -use arrow::record_batch::RecordBatch; -use arrow::util::bit_util::{set_bit_raw, unset_bit_raw}; -use datafusion_common::cast::{as_date32_array, as_date64_array, as_decimal128_array}; -use datafusion_common::Result; -use std::sync::Arc; - -/// Append batch from `row_idx` to `output` buffer start from `offset` -/// # Panics -/// -/// This function will panic if the output buffer doesn't have enough space to hold all the rows -pub fn write_batch_unchecked( - output: &mut [u8], - offset: usize, - batch: &RecordBatch, - row_idx: usize, - schema: Arc, -) -> Vec { - let mut writer = RowWriter::new(&schema); - let mut current_offset = offset; - let mut offsets = vec![]; - let columns = batch.columns(); - for cur_row in row_idx..batch.num_rows() { - offsets.push(current_offset); - let row_width = write_row(&mut writer, cur_row, &schema, columns); - output[current_offset..current_offset + row_width] - .copy_from_slice(writer.get_row()); - current_offset += row_width; - writer.reset() - } - offsets -} - -/// Bench interpreted version write -#[inline(never)] -pub fn bench_write_batch( - batches: &[Vec], - schema: Arc, -) -> Result> { - let mut writer = RowWriter::new(&schema); - let mut lengths = vec![]; - - for batch in batches.iter().flatten() { - let columns = batch.columns(); - for cur_row in 0..batch.num_rows() { - let row_width = write_row(&mut writer, cur_row, &schema, columns); - lengths.push(row_width); - writer.reset() - } - } - - Ok(lengths) -} - -#[macro_export] -macro_rules! set_idx { - ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{ - $SELF.assert_index_valid($IDX); - let offset = $SELF.field_offsets()[$IDX]; - $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes()); - }}; -} - -#[macro_export] -macro_rules! fn_set_idx { - ($NATIVE: ident, $WIDTH: literal) => { - paste::item! { - fn [](&mut self, idx: usize, value: $NATIVE) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes()); - } - } - }; -} - -/// Reusable row writer backed by `Vec` -/// -/// ```text -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ -/// RowWriter │ -/// ┌───────────────────────┐ │ [RowFormat] -/// │ │ │ -/// │ │ │(copy from Array -/// │ │ to [u8]) │ ┌───────────────────────┐ -/// │ RecordBatch │ └ ─ ─ ─ ─ ─ ─ ─ ─ │ RowFormat │ -/// │ │──────────────────────────────▶│ Vec │ -/// │ (... N Rows ...) │ │ │ -/// │ │ └───────────────────────┘ -/// │ │ -/// │ │ -/// └───────────────────────┘ -/// ``` -pub struct RowWriter { - /// Layout on how to write each field - layout: RowLayout, - /// Buffer for the current tuple being written. - data: Vec, - /// Length in bytes for the current tuple, 8-bytes word aligned. - pub(crate) row_width: usize, -} - -impl RowWriter { - /// New - pub fn new(schema: &Schema) -> Self { - let layout = RowLayout::new(schema); - let init_capacity = layout.fixed_part_width(); - Self { - layout, - data: vec![0; init_capacity], - row_width: init_capacity, - } - } - - /// Reset the row writer state for new tuple - pub fn reset(&mut self) { - self.data.fill(0); - self.row_width = self.layout.fixed_part_width(); - } - - #[inline] - fn assert_index_valid(&self, idx: usize) { - assert!(idx < self.layout.field_count); - } - - #[inline(always)] - fn field_offsets(&self) -> &[usize] { - &self.layout.field_offsets - } - - #[inline(always)] - fn null_free(&self) -> bool { - self.layout.null_free - } - - pub(crate) fn set_null_at(&mut self, idx: usize) { - assert!( - !self.null_free(), - "Unexpected call to set_null_at on null-free row writer" - ); - let null_bits = &mut self.data[0..self.layout.null_width]; - unsafe { - unset_bit_raw(null_bits.as_mut_ptr(), idx); - } - } - - pub(crate) fn set_non_null_at(&mut self, idx: usize) { - assert!( - !self.null_free(), - "Unexpected call to set_non_null_at on null-free row writer" - ); - let null_bits = &mut self.data[0..self.layout.null_width]; - unsafe { - set_bit_raw(null_bits.as_mut_ptr(), idx); - } - } - - fn set_bool(&mut self, idx: usize, value: bool) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = u8::from(value); - } - - fn set_u8(&mut self, idx: usize, value: u8) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = value; - } - - fn_set_idx!(u16, 2); - fn_set_idx!(u32, 4); - fn_set_idx!(u64, 8); - fn_set_idx!(i16, 2); - fn_set_idx!(i32, 4); - fn_set_idx!(i64, 8); - fn_set_idx!(f32, 4); - fn_set_idx!(f64, 8); - - fn set_i8(&mut self, idx: usize, value: i8) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = value.to_le_bytes()[0]; - } - - fn set_date32(&mut self, idx: usize, value: i32) { - set_idx!(4, self, idx, value) - } - - fn set_date64(&mut self, idx: usize, value: i64) { - set_idx!(8, self, idx, value) - } - - fn set_decimal128(&mut self, idx: usize, value: i128) { - set_idx!(16, self, idx, value) - } - - /// Get raw bytes - pub fn get_row(&self) -> &[u8] { - &self.data[0..self.row_width] - } -} - -/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width -pub fn write_row( - row_writer: &mut RowWriter, - row_idx: usize, - schema: &Schema, - columns: &[ArrayRef], -) -> usize { - // Get the row from the batch denoted by row_idx - if row_writer.null_free() { - for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) { - write_field(i, row_idx, col, f.data_type(), row_writer); - } - } else { - for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) { - if !col.is_null(row_idx) { - row_writer.set_non_null_at(i); - write_field(i, row_idx, col, f.data_type(), row_writer); - } else { - row_writer.set_null_at(i); - } - } - } - - row_writer.row_width -} - -macro_rules! fn_write_field { - ($NATIVE: ident, $ARRAY: ident) => { - paste::item! { - pub(crate) fn [](to: &mut RowWriter, from: &Arc, col_idx: usize, row_idx: usize) { - let from = from - .as_any() - .downcast_ref::<$ARRAY>() - .unwrap(); - to.[](col_idx, from.value(row_idx)); - } - } - }; -} - -fn_write_field!(bool, BooleanArray); -fn_write_field!(u8, UInt8Array); -fn_write_field!(u16, UInt16Array); -fn_write_field!(u32, UInt32Array); -fn_write_field!(u64, UInt64Array); -fn_write_field!(i8, Int8Array); -fn_write_field!(i16, Int16Array); -fn_write_field!(i32, Int32Array); -fn_write_field!(i64, Int64Array); -fn_write_field!(f32, Float32Array); -fn_write_field!(f64, Float64Array); - -pub(crate) fn write_field_date32( - to: &mut RowWriter, - from: &Arc, - col_idx: usize, - row_idx: usize, -) { - match as_date32_array(from) { - Ok(from) => to.set_date32(col_idx, from.value(row_idx)), - Err(e) => panic!("{e}"), - }; -} - -pub(crate) fn write_field_date64( - to: &mut RowWriter, - from: &Arc, - col_idx: usize, - row_idx: usize, -) { - let from = as_date64_array(from).unwrap(); - to.set_date64(col_idx, from.value(row_idx)); -} - -pub(crate) fn write_field_decimal128( - to: &mut RowWriter, - from: &Arc, - col_idx: usize, - row_idx: usize, -) { - let from = as_decimal128_array(from).unwrap(); - to.set_decimal128(col_idx, from.value(row_idx)); -} - -fn write_field( - col_idx: usize, - row_idx: usize, - col: &Arc, - dt: &DataType, - row: &mut RowWriter, -) { - use DataType::*; - match dt { - Boolean => write_field_bool(row, col, col_idx, row_idx), - UInt8 => write_field_u8(row, col, col_idx, row_idx), - UInt16 => write_field_u16(row, col, col_idx, row_idx), - UInt32 => write_field_u32(row, col, col_idx, row_idx), - UInt64 => write_field_u64(row, col, col_idx, row_idx), - Int8 => write_field_i8(row, col, col_idx, row_idx), - Int16 => write_field_i16(row, col, col_idx, row_idx), - Int32 => write_field_i32(row, col, col_idx, row_idx), - Int64 => write_field_i64(row, col, col_idx, row_idx), - Float32 => write_field_f32(row, col, col_idx, row_idx), - Float64 => write_field_f64(row, col, col_idx, row_idx), - Date32 => write_field_date32(row, col, col_idx, row_idx), - Date64 => write_field_date64(row, col, col_idx, row_idx), - Decimal128(_, _) => write_field_decimal128(row, col, col_idx, row_idx), - _ => unimplemented!(), - } -} From 4f1eaed9a2ae7271a41c6623980a94886d79a68c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 19 Jul 2023 07:32:01 -0400 Subject: [PATCH 3/4] Update datafusion cli --- datafusion-cli/Cargo.lock | 94 +++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 53 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index c94e81311c90..53af19e80b3f 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -56,9 +56,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56fc6cf8dc8c4158eed8649f9b8b0ea1518eb62b544fe9490d66fa0b349eafe9" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "android-tzdata" @@ -307,9 +307,9 @@ dependencies = [ [[package]] name = "assert_cmd" -version = "2.0.11" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86d6b683edf8d1119fe420a94f8a7e389239666aa72e65495d91c00462510151" +checksum = "88903cb14723e4d4003335bb7f8a14f27691649105346a0f0957466c096adfe6" dependencies = [ "anstyle", "bstr", @@ -346,7 +346,7 @@ checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -1029,7 +1029,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f34ba9a9bcb8645379e9de8cb3ecfcf4d1c85ba66d90deb3259206fa5aa193b" dependencies = [ "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -1064,7 +1064,6 @@ dependencies = [ "datafusion-expr", "datafusion-optimizer", "datafusion-physical-expr", - "datafusion-row", "datafusion-sql", "flate2", "futures", @@ -1189,7 +1188,6 @@ dependencies = [ "chrono", "datafusion-common", "datafusion-expr", - "datafusion-row", "half", "hashbrown 0.14.0", "hex", @@ -1208,16 +1206,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "datafusion-row" -version = "27.0.0" -dependencies = [ - "arrow", - "datafusion-common", - "paste", - "rand", -] - [[package]] name = "datafusion-sql" version = "27.0.0" @@ -1491,7 +1479,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -1843,9 +1831,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.8" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b02a5381cc465bd3041d84623d0fa3b66738b52b8e2fc3bab8ad63ab032f4a" +checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "jobserver" @@ -2320,9 +2308,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4b27ab7be369122c218afc2079489cdcb4b517c0a3fc386ff11e1fedfcc2b35" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" [[package]] name = "percent-encoding" @@ -2395,7 +2383,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -2485,9 +2473,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.64" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" dependencies = [ "unicode-ident", ] @@ -2504,9 +2492,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.29" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" +checksum = "5fe8a65d69dd0808184ebb5f836ab526bb259db23c657efa38711b1072ee47f0" dependencies = [ "proc-macro2", ] @@ -2790,9 +2778,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc31bd9b61a32c31f9650d18add92aa83a49ba979c143eefd27fe7177b05bd5f" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "rustyline" @@ -2819,9 +2807,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.14" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe232bdf6be8c8de797b22184ee71118d63780ea42ac85b61d1baa6d3b782ae9" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" [[package]] name = "same-file" @@ -2843,9 +2831,9 @@ dependencies = [ [[package]] name = "scopeguard" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "sct" @@ -2882,15 +2870,15 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" +checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" [[package]] name = "seq-macro" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63134939175b3131fe4d2c131b103fd42f25ccca89423d43b5e4f267920ccf03" +checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" @@ -2909,14 +2897,14 @@ checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] name = "serde_json" -version = "1.0.102" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5062a995d481b2308b6064e9af76011f2921c35f97b0468811ed9f6cd91dfed" +checksum = "d03b412469450d4404fe8499a268edd7f8b79fecb074b0d812ad64ca21f4031b" dependencies = [ "itoa", "ryu", @@ -3088,7 +3076,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -3110,9 +3098,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.25" +version = "2.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15e3fc8c0c74267e2df136e5e5fb656a464158aa57624053375eb9c8c6e25ae2" +checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970" dependencies = [ "proc-macro2", "quote", @@ -3171,7 +3159,7 @@ checksum = "463fe12d7993d3b327787537ce8dd4dfa058de32fc2b195ef3cde03dc4771e8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -3262,7 +3250,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -3360,7 +3348,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -3402,9 +3390,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" [[package]] name = "unicode-ident" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22049a19f4a68748a168c0fc439f9516686aa045927ff767eca0a85101fb6e73" +checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" [[package]] name = "unicode-normalization" @@ -3458,9 +3446,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d023da39d1fde5a8a3fe1f3e01ca9632ada0a63e9797de55a879d6e2236277be" +checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "getrandom", ] @@ -3532,7 +3520,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", "wasm-bindgen-shared", ] @@ -3566,7 +3554,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", "wasm-bindgen-backend", "wasm-bindgen-shared", ] From e3907d2fea9022983f1aff16878ecd5f688e1297 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 19 Jul 2023 07:34:20 -0400 Subject: [PATCH 4/4] update depes --- datafusion/core/src/lib.rs | 1 - dev/release/crate-deps.dot | 2 - dev/release/crate-deps.svg | 134 ++++++++++++++++--------------------- 3 files changed, 58 insertions(+), 79 deletions(-) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 8a7e39301a9d..c0ed13c8e063 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -398,7 +398,6 @@ //! * [datafusion_execution]: State and structures needed for execution //! * [datafusion_optimizer]: [`OptimizerRule`]s and [`AnalyzerRule`]s //! * [datafusion_physical_expr]: [`PhysicalExpr`] and related expressions -//! * [datafusion_row]: Row based representation //! * [datafusion_sql]: SQL planner ([`SqlToRel`]) //! //! [sqlparser]: https://docs.rs/sqlparser/latest/sqlparser diff --git a/dev/release/crate-deps.dot b/dev/release/crate-deps.dot index a2199befaf8e..756614d4d344 100644 --- a/dev/release/crate-deps.dot +++ b/dev/release/crate-deps.dot @@ -30,13 +30,11 @@ digraph G { datafusion_physical_expr -> datafusion_common datafusion_physical_expr -> datafusion_expr - datafusion_row -> datafusion_common datafusion -> datafusion_common datafusion -> datafusion_expr datafusion -> datafusion_optimizer datafusion -> datafusion_physical_expr - datafusion -> datafusion_row datafusion -> datafusion_sql datafusion_proto -> datafusion diff --git a/dev/release/crate-deps.svg b/dev/release/crate-deps.svg index f55a5fcd7b24..388e9e7705dc 100644 --- a/dev/release/crate-deps.svg +++ b/dev/release/crate-deps.svg @@ -1,31 +1,31 @@ - - + G - + datafusion_common - -datafusion_common + +datafusion_common datafusion_expr - -datafusion_expr + +datafusion_expr datafusion_expr->datafusion_common - - + + @@ -36,14 +36,14 @@ datafusion_sql->datafusion_common - - + + datafusion_sql->datafusion_expr - - + + @@ -54,14 +54,14 @@ datafusion_optimizer->datafusion_common - - + + datafusion_optimizer->datafusion_expr - - + + @@ -72,104 +72,86 @@ datafusion_physical_expr->datafusion_common - - + + datafusion_physical_expr->datafusion_expr - - - - - -datafusion_row - -datafusion_row - - - -datafusion_row->datafusion_common - - + + - + datafusion - -datafusion + +datafusion - + datafusion->datafusion_common - - + + - + datafusion->datafusion_expr - - + + - + datafusion->datafusion_sql - - + + - + datafusion->datafusion_optimizer - - + + - + datafusion->datafusion_physical_expr - - - - - -datafusion->datafusion_row - - + + - + datafusion_proto - -datafusion_proto + +datafusion_proto - + datafusion_proto->datafusion - - + + - + datafusion_substrait - -datafusion_substrait + +datafusion_substrait - + datafusion_substrait->datafusion - - + + - + datafusion_cli - -datafusion_cli + +datafusion_cli - + datafusion_cli->datafusion - - + +