Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support scan empty projection #7920

Merged
merged 7 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
use arrow::array::{ArrayData, BufferBuilder};
use arrow::buffer::Buffer;
use arrow::datatypes::{ArrowNativeType, UInt16Type};
use arrow_array::{ArrayRef, DictionaryArray, RecordBatch};
use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, ColumnStatistics, Statistics};
Expand Down Expand Up @@ -339,7 +339,13 @@ impl PartitionColumnProjector {
),
)
}
RecordBatch::try_new(Arc::clone(&self.projected_schema), cols).map_err(Into::into)

RecordBatch::try_new_with_options(
Arc::clone(&self.projected_schema),
cols,
&RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
)
.map_err(Into::into)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,12 @@ impl TableProvider for CustomProvider {
async fn scan(
&self,
_state: &SessionState,
_: Option<&Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let empty = Vec::new();
let projection = projection.unwrap_or(&empty);
match &filters[0] {
Expr::BinaryExpr(BinaryExpr { right, .. }) => {
let int_value = match &**right {
Expand Down Expand Up @@ -182,7 +184,10 @@ impl TableProvider for CustomProvider {
};

Ok(Arc::new(CustomPlan {
schema: self.zero_batch.schema(),
schema: match projection.is_empty() {
true => Arc::new(Schema::empty()),
false => self.zero_batch.schema(),
},
batches: match int_value {
0 => vec![self.zero_batch.clone()],
1 => vec![self.one_batch.clone()],
Expand All @@ -191,7 +196,10 @@ impl TableProvider for CustomProvider {
}))
}
_ => Ok(Arc::new(CustomPlan {
schema: self.zero_batch.schema(),
schema: match projection.is_empty() {
true => Arc::new(Schema::empty()),
false => self.zero_batch.schema(),
},
batches: vec![],
})),
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ async fn explain_logical_plan_only() {
"logical_plan",
"Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]\
\n SubqueryAlias: t\
\n Projection: column2\
\n Projection: \
\n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))"
]];
assert_eq!(expected, actual);
Expand Down
168 changes: 1 addition & 167 deletions datafusion/optimizer/src/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::merge_projection::merge_projection;
use crate::optimizer::ApplyOrder;
use crate::push_down_filter::replace_cols_by_name;
use crate::{OptimizerConfig, OptimizerRule};
use arrow::datatypes::DataType;
use arrow::error::Result as ArrowResult;
use datafusion_common::ScalarValue::UInt8;
use datafusion_common::{
Expand Down Expand Up @@ -149,10 +148,6 @@ impl OptimizerRule for PushDownProjection {
{
let mut used_columns: HashSet<Column> = HashSet::new();
if projection_is_empty {
let field = find_small_field(scan.projected_schema.fields()).ok_or(
Copy link
Contributor

@Dandandan Dandandan Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@haohuaijin haohuaijin Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can remove it.already fix, thanks.

DataFusionError::Internal("Scan with empty schema".to_string()),
)?;
used_columns.insert(field.qualified_column());
push_down_scan(&used_columns, scan, true)?
} else {
for expr in projection.expr.iter() {
Expand All @@ -163,17 +158,6 @@ impl OptimizerRule for PushDownProjection {
plan.with_new_inputs(&[new_scan])?
}
}
LogicalPlan::Values(values) if projection_is_empty => {
let field = find_small_field(values.schema.fields()).ok_or(
DataFusionError::Internal("Values with empty schema".to_string()),
)?;
let column = Expr::Column(field.qualified_column());

LogicalPlan::Projection(Projection::try_new(
vec![column],
Arc::new(child_plan.clone()),
)?)
}
LogicalPlan::Union(union) => {
let mut required_columns = HashSet::new();
exprlist_to_columns(&projection.expr, &mut required_columns)?;
Expand Down Expand Up @@ -429,87 +413,6 @@ pub fn collect_projection_expr(projection: &Projection) -> HashMap<String, Expr>
.collect::<HashMap<_, _>>()
}

/// Accumulate the memory size of a data type measured in bits.
///
/// Types with a variable size get assigned with a fixed size which is greater than most
/// primitive types.
///
/// While traversing nested types, `nesting` is incremented on every level.
fn nested_size(data_type: &DataType, nesting: &mut usize) -> usize {
use DataType::*;
if data_type.is_primitive() {
return data_type.primitive_width().unwrap_or(1) * 8;
}

if data_type.is_nested() {
*nesting += 1;
}

match data_type {
Null => 0,
Boolean => 1,
Binary | Utf8 => 128,
LargeBinary | LargeUtf8 => 256,
FixedSizeBinary(bytes) => (*bytes * 8) as usize,
// primitive types
Int8
| Int16
| Int32
| Int64
| UInt8
| UInt16
| UInt32
| UInt64
| Float16
| Float32
| Float64
| Timestamp(_, _)
| Date32
| Date64
| Time32(_)
| Time64(_)
| Duration(_)
| Interval(_)
| Dictionary(_, _)
| Decimal128(_, _)
| Decimal256(_, _) => data_type.primitive_width().unwrap_or(1) * 8,
// nested types
List(f) => nested_size(f.data_type(), nesting),
FixedSizeList(_, s) => (s * 8) as usize,
LargeList(f) => nested_size(f.data_type(), nesting),
Struct(fields) => fields
.iter()
.map(|f| nested_size(f.data_type(), nesting))
.sum(),
Union(fields, _) => fields
.iter()
.map(|(_, f)| nested_size(f.data_type(), nesting))
.sum(),
Map(field, _) => nested_size(field.data_type(), nesting),
RunEndEncoded(run_ends, values) => {
nested_size(run_ends.data_type(), nesting)
+ nested_size(values.data_type(), nesting)
}
}
}

/// Find a field with a presumable small memory footprint based on its data type's memory size
/// and the level of nesting.
fn find_small_field(fields: &[DFField]) -> Option<DFField> {
fields
.iter()
.map(|f| {
let nesting = &mut 0;
let size = nested_size(f.data_type(), nesting);
(*nesting, size)
})
.enumerate()
.min_by(|(_, (nesting_a, size_a)), (_, (nesting_b, size_b))| {
nesting_a.cmp(nesting_b).then(size_a.cmp(size_b))
})
.map(|(i, _)| fields[i].clone())
}

/// Get the projection exprs from columns in the order of the schema
fn get_expr(columns: &HashSet<Column>, schema: &DFSchemaRef) -> Result<Vec<Expr>> {
let expr = schema
Expand Down Expand Up @@ -640,7 +543,7 @@ mod tests {
use crate::optimizer::Optimizer;
use crate::test::*;
use crate::OptimizerContext;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::DFSchema;
use datafusion_expr::builder::table_scan_with_filters;
use datafusion_expr::expr;
Expand Down Expand Up @@ -1232,73 +1135,4 @@ mod tests {
.unwrap_or(optimized_plan);
Ok(optimized_plan)
}

#[test]
fn test_nested_size() {
use DataType::*;
let nesting = &mut 0;
assert_eq!(nested_size(&Null, nesting), 0);
assert_eq!(*nesting, 0);
assert_eq!(nested_size(&Boolean, nesting), 1);
assert_eq!(*nesting, 0);
assert_eq!(nested_size(&UInt8, nesting), 8);
assert_eq!(*nesting, 0);
assert_eq!(nested_size(&Int64, nesting), 64);
assert_eq!(*nesting, 0);
assert_eq!(nested_size(&Decimal256(5, 2), nesting), 256);
assert_eq!(*nesting, 0);
assert_eq!(
nested_size(&List(Arc::new(Field::new("A", Int64, true))), nesting),
64
);
assert_eq!(*nesting, 1);
*nesting = 0;
assert_eq!(
nested_size(
&List(Arc::new(Field::new(
"A",
List(Arc::new(Field::new("AA", Int64, true))),
true
))),
nesting
),
64
);
assert_eq!(*nesting, 2);
}

#[test]
fn test_find_small_field() {
use DataType::*;
let int32 = DFField::from(Field::new("a", Int32, false));
let bin = DFField::from(Field::new("b", Binary, false));
let list_i64 = DFField::from(Field::new(
"c",
List(Arc::new(Field::new("c_1", Int64, true))),
false,
));
let time_s = DFField::from(Field::new("d", Time32(TimeUnit::Second), false));

assert_eq!(
find_small_field(&[
int32.clone(),
bin.clone(),
list_i64.clone(),
time_s.clone()
]),
Some(int32.clone())
);
assert_eq!(
find_small_field(&[bin.clone(), list_i64.clone(), time_s.clone()]),
Some(time_s.clone())
);
assert_eq!(
find_small_field(&[time_s.clone(), int32.clone()]),
Some(time_s.clone())
);
assert_eq!(
find_small_field(&[bin.clone(), list_i64.clone()]),
Some(bin.clone())
);
}
}
4 changes: 3 additions & 1 deletion datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::{

use arrow::datatypes::{Fields, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::RecordBatchOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
Expand Down Expand Up @@ -347,13 +348,14 @@ fn build_batch(
})
.collect::<Result<Vec<_>>>()?;

RecordBatch::try_new(
RecordBatch::try_new_with_options(
Copy link
Contributor

@Dandandan Dandandan Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we have more operators with empty inputs that use RecordBatch::try_new that should move to use RecordBatch::try_new_with_options (but are not covered in tests).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have thought about this too, but I didn't come up with any more tests😥.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine to merge as is and see if we get any regressions (hopefully not), those should be easy to fix in a similar manner.

Arc::new(schema.clone()),
arrays
.iter()
.chain(batch.columns().iter())
.cloned()
.collect(),
&RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
)
.map_err(Into::into)
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,10 @@ EXPLAIN SELECT count(*) from alltypes_plain
----
logical_plan
Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
--TableScan: alltypes_plain projection=[bool_col]
--TableScan: alltypes_plain projection=[]
physical_plan
AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[bool_col]
--------AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/json.slt
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ EXPLAIN SELECT count(*) from json_test
----
logical_plan
Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
--TableScan: json_test projection=[c]
--TableScan: json_test projection=[]
physical_plan
AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[c]
--------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}

query error DataFusion error: Schema error: No field named mycol\.
SELECT mycol FROM single_nan
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ logical_plan
Projection: __scalar_sq_1.COUNT(*) AS b
--SubqueryAlias: __scalar_sq_1
----Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
------TableScan: t1 projection=[t1_id]
------TableScan: t1 projection=[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉


#simple_uncorrelated_scalar_subquery2
query TT
Expand All @@ -706,10 +706,10 @@ Projection: __scalar_sq_1.COUNT(*) AS b, __scalar_sq_2.COUNT(Int64(1)) AS COUNT(
--Left Join:
----SubqueryAlias: __scalar_sq_1
------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
--------TableScan: t1 projection=[t1_id]
--------TableScan: t1 projection=[]
----SubqueryAlias: __scalar_sq_2
------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]
--------TableScan: t2 projection=[t2_id]
--------TableScan: t2 projection=[]

query II
select (select count(*) from t1) as b, (select count(1) from t2)
Expand Down