diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index d8a9697b2bf7..3efb0df9df7c 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -36,7 +36,7 @@ use crate::{ use arrow::array::{ArrayData, BufferBuilder}; use arrow::buffer::Buffer; use arrow::datatypes::{ArrowNativeType, UInt16Type}; -use arrow_array::{ArrayRef, DictionaryArray, RecordBatch}; +use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::Precision; use datafusion_common::{exec_err, ColumnStatistics, Statistics}; @@ -339,7 +339,13 @@ impl PartitionColumnProjector { ), ) } - RecordBatch::try_new(Arc::clone(&self.projected_schema), cols).map_err(Into::into) + + RecordBatch::try_new_with_options( + Arc::clone(&self.projected_schema), + cols, + &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())), + ) + .map_err(Into::into) } } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 4679ca6d07df..e374abd6e891 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -149,10 +149,12 @@ impl TableProvider for CustomProvider { async fn scan( &self, _state: &SessionState, - _: Option<&Vec>, + projection: Option<&Vec>, filters: &[Expr], _: Option, ) -> Result> { + let empty = Vec::new(); + let projection = projection.unwrap_or(&empty); match &filters[0] { Expr::BinaryExpr(BinaryExpr { right, .. }) => { let int_value = match &**right { @@ -182,7 +184,10 @@ impl TableProvider for CustomProvider { }; Ok(Arc::new(CustomPlan { - schema: self.zero_batch.schema(), + schema: match projection.is_empty() { + true => Arc::new(Schema::empty()), + false => self.zero_batch.schema(), + }, batches: match int_value { 0 => vec![self.zero_batch.clone()], 1 => vec![self.one_batch.clone()], @@ -191,7 +196,10 @@ impl TableProvider for CustomProvider { })) } _ => Ok(Arc::new(CustomPlan { - schema: self.zero_batch.schema(), + schema: match projection.is_empty() { + true => Arc::new(Schema::empty()), + false => self.zero_batch.schema(), + }, batches: vec![], })), } diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 7238369f832a..2436e82f3ce9 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -788,7 +788,7 @@ async fn explain_logical_plan_only() { "logical_plan", "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]\ \n SubqueryAlias: t\ - \n Projection: column2\ + \n Projection: \ \n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))" ]]; assert_eq!(expected, actual); diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index 839f6b5bb8f6..e7fdaa8b0b5e 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -23,7 +23,6 @@ use crate::merge_projection::merge_projection; use crate::optimizer::ApplyOrder; use crate::push_down_filter::replace_cols_by_name; use crate::{OptimizerConfig, OptimizerRule}; -use arrow::datatypes::DataType; use arrow::error::Result as ArrowResult; use datafusion_common::ScalarValue::UInt8; use datafusion_common::{ @@ -149,10 +148,6 @@ impl OptimizerRule for PushDownProjection { { let mut used_columns: HashSet = HashSet::new(); if projection_is_empty { - let field = find_small_field(scan.projected_schema.fields()).ok_or( - DataFusionError::Internal("Scan with empty schema".to_string()), - )?; - used_columns.insert(field.qualified_column()); push_down_scan(&used_columns, scan, true)? } else { for expr in projection.expr.iter() { @@ -163,17 +158,6 @@ impl OptimizerRule for PushDownProjection { plan.with_new_inputs(&[new_scan])? } } - LogicalPlan::Values(values) if projection_is_empty => { - let field = find_small_field(values.schema.fields()).ok_or( - DataFusionError::Internal("Values with empty schema".to_string()), - )?; - let column = Expr::Column(field.qualified_column()); - - LogicalPlan::Projection(Projection::try_new( - vec![column], - Arc::new(child_plan.clone()), - )?) - } LogicalPlan::Union(union) => { let mut required_columns = HashSet::new(); exprlist_to_columns(&projection.expr, &mut required_columns)?; @@ -429,87 +413,6 @@ pub fn collect_projection_expr(projection: &Projection) -> HashMap .collect::>() } -/// Accumulate the memory size of a data type measured in bits. -/// -/// Types with a variable size get assigned with a fixed size which is greater than most -/// primitive types. -/// -/// While traversing nested types, `nesting` is incremented on every level. -fn nested_size(data_type: &DataType, nesting: &mut usize) -> usize { - use DataType::*; - if data_type.is_primitive() { - return data_type.primitive_width().unwrap_or(1) * 8; - } - - if data_type.is_nested() { - *nesting += 1; - } - - match data_type { - Null => 0, - Boolean => 1, - Binary | Utf8 => 128, - LargeBinary | LargeUtf8 => 256, - FixedSizeBinary(bytes) => (*bytes * 8) as usize, - // primitive types - Int8 - | Int16 - | Int32 - | Int64 - | UInt8 - | UInt16 - | UInt32 - | UInt64 - | Float16 - | Float32 - | Float64 - | Timestamp(_, _) - | Date32 - | Date64 - | Time32(_) - | Time64(_) - | Duration(_) - | Interval(_) - | Dictionary(_, _) - | Decimal128(_, _) - | Decimal256(_, _) => data_type.primitive_width().unwrap_or(1) * 8, - // nested types - List(f) => nested_size(f.data_type(), nesting), - FixedSizeList(_, s) => (s * 8) as usize, - LargeList(f) => nested_size(f.data_type(), nesting), - Struct(fields) => fields - .iter() - .map(|f| nested_size(f.data_type(), nesting)) - .sum(), - Union(fields, _) => fields - .iter() - .map(|(_, f)| nested_size(f.data_type(), nesting)) - .sum(), - Map(field, _) => nested_size(field.data_type(), nesting), - RunEndEncoded(run_ends, values) => { - nested_size(run_ends.data_type(), nesting) - + nested_size(values.data_type(), nesting) - } - } -} - -/// Find a field with a presumable small memory footprint based on its data type's memory size -/// and the level of nesting. -fn find_small_field(fields: &[DFField]) -> Option { - fields - .iter() - .map(|f| { - let nesting = &mut 0; - let size = nested_size(f.data_type(), nesting); - (*nesting, size) - }) - .enumerate() - .min_by(|(_, (nesting_a, size_a)), (_, (nesting_b, size_b))| { - nesting_a.cmp(nesting_b).then(size_a.cmp(size_b)) - }) - .map(|(i, _)| fields[i].clone()) -} - /// Get the projection exprs from columns in the order of the schema fn get_expr(columns: &HashSet, schema: &DFSchemaRef) -> Result> { let expr = schema @@ -640,7 +543,7 @@ mod tests { use crate::optimizer::Optimizer; use crate::test::*; use crate::OptimizerContext; - use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::DFSchema; use datafusion_expr::builder::table_scan_with_filters; use datafusion_expr::expr; @@ -1232,73 +1135,4 @@ mod tests { .unwrap_or(optimized_plan); Ok(optimized_plan) } - - #[test] - fn test_nested_size() { - use DataType::*; - let nesting = &mut 0; - assert_eq!(nested_size(&Null, nesting), 0); - assert_eq!(*nesting, 0); - assert_eq!(nested_size(&Boolean, nesting), 1); - assert_eq!(*nesting, 0); - assert_eq!(nested_size(&UInt8, nesting), 8); - assert_eq!(*nesting, 0); - assert_eq!(nested_size(&Int64, nesting), 64); - assert_eq!(*nesting, 0); - assert_eq!(nested_size(&Decimal256(5, 2), nesting), 256); - assert_eq!(*nesting, 0); - assert_eq!( - nested_size(&List(Arc::new(Field::new("A", Int64, true))), nesting), - 64 - ); - assert_eq!(*nesting, 1); - *nesting = 0; - assert_eq!( - nested_size( - &List(Arc::new(Field::new( - "A", - List(Arc::new(Field::new("AA", Int64, true))), - true - ))), - nesting - ), - 64 - ); - assert_eq!(*nesting, 2); - } - - #[test] - fn test_find_small_field() { - use DataType::*; - let int32 = DFField::from(Field::new("a", Int32, false)); - let bin = DFField::from(Field::new("b", Binary, false)); - let list_i64 = DFField::from(Field::new( - "c", - List(Arc::new(Field::new("c_1", Int64, true))), - false, - )); - let time_s = DFField::from(Field::new("d", Time32(TimeUnit::Second), false)); - - assert_eq!( - find_small_field(&[ - int32.clone(), - bin.clone(), - list_i64.clone(), - time_s.clone() - ]), - Some(int32.clone()) - ); - assert_eq!( - find_small_field(&[bin.clone(), list_i64.clone(), time_s.clone()]), - Some(time_s.clone()) - ); - assert_eq!( - find_small_field(&[time_s.clone(), int32.clone()]), - Some(time_s.clone()) - ); - assert_eq!( - find_small_field(&[bin.clone(), list_i64.clone()]), - Some(bin.clone()) - ); - } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 1ffd9ad1c18a..866cd52d9631 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -35,6 +35,7 @@ use crate::{ use arrow::datatypes::{Fields, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use arrow_array::RecordBatchOptions; use datafusion_common::stats::Precision; use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -347,13 +348,14 @@ fn build_batch( }) .collect::>>()?; - RecordBatch::try_new( + RecordBatch::try_new_with_options( Arc::new(schema.clone()), arrays .iter() .chain(batch.columns().iter()) .cloned() .collect(), + &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())), ) .map_err(Into::into) } diff --git a/datafusion/sqllogictest/test_files/avro.slt b/datafusion/sqllogictest/test_files/avro.slt index bd2ba706663c..3f21274c009f 100644 --- a/datafusion/sqllogictest/test_files/avro.slt +++ b/datafusion/sqllogictest/test_files/avro.slt @@ -253,10 +253,10 @@ EXPLAIN SELECT count(*) from alltypes_plain ---- logical_plan Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] ---TableScan: alltypes_plain projection=[bool_col] +--TableScan: alltypes_plain projection=[] physical_plan AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] --CoalescePartitionsExec ----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[bool_col] +--------AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]} diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index f903e48063f8..b3e6ae18a96b 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -50,13 +50,13 @@ EXPLAIN SELECT count(*) from json_test ---- logical_plan Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] ---TableScan: json_test projection=[c] +--TableScan: json_test projection=[] physical_plan AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] --CoalescePartitionsExec ----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[c] +--------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]} query error DataFusion error: Schema error: No field named mycol\. SELECT mycol FROM single_nan diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 7cbb848f3333..822a70bb5bad 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -695,7 +695,7 @@ logical_plan Projection: __scalar_sq_1.COUNT(*) AS b --SubqueryAlias: __scalar_sq_1 ----Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] -------TableScan: t1 projection=[t1_id] +------TableScan: t1 projection=[] #simple_uncorrelated_scalar_subquery2 query TT @@ -706,10 +706,10 @@ Projection: __scalar_sq_1.COUNT(*) AS b, __scalar_sq_2.COUNT(Int64(1)) AS COUNT( --Left Join: ----SubqueryAlias: __scalar_sq_1 ------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] ---------TableScan: t1 projection=[t1_id] +--------TableScan: t1 projection=[] ----SubqueryAlias: __scalar_sq_2 ------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]] ---------TableScan: t2 projection=[t2_id] +--------TableScan: t2 projection=[] query II select (select count(*) from t1) as b, (select count(1) from t2)