Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split EmptyExec into PlaceholderRowExec #8446

Merged
merged 5 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl TableProvider for EmptyTable {
// even though there is no data, projections apply
let projected_schema = project_schema(&self.schema, projection)?;
Ok(Arc::new(
EmptyExec::new(false, projected_schema).with_partitions(self.partitions),
EmptyExec::new(projected_schema).with_partitions(self.partitions),
))
}
}
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ impl TableProvider for ListingTable {
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

// extract types of partition columns
Expand Down Expand Up @@ -713,7 +713,7 @@ impl TableProvider for ListingTable {
let object_store_url = if let Some(url) = self.table_paths.get(0) {
url.object_store()
} else {
return Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))));
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};
// create the execution plan
self.options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ use super::optimizer::PhysicalOptimizerRule;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::aggregates::AggregateExec;
use crate::physical_plan::empty::EmptyExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{expressions, AggregateExpr, ExecutionPlan, Statistics};
use crate::scalar::ScalarValue;

use datafusion_common::stats::Precision;
use datafusion_common::tree_node::TreeNode;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_physical_plan::placeholder_row::PlaceHolderRowExec;

/// Optimizer that uses available statistics for aggregate functions
#[derive(Default)]
Expand Down Expand Up @@ -82,7 +82,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
// input can be entirely removed
Ok(Arc::new(ProjectionExec::try_new(
projections,
Arc::new(EmptyExec::new(true, plan.schema())),
Arc::new(PlaceHolderRowExec::new(plan.schema())),
)?))
} else {
plan.map_children(|child| self.optimize(child, _config))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1623,12 +1623,12 @@ mod hash_join_tests {

let children = vec![
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: left_unbounded,
children: vec![],
},
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: right_unbounded,
children: vec![],
},
Expand Down
12 changes: 9 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ use datafusion_expr::{
WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_plan::placeholder_row::PlaceHolderRowExec;
use datafusion_sql::utils::window_expr_common_partition_keys;

use async_trait::async_trait;
Expand Down Expand Up @@ -1196,10 +1197,15 @@ impl DefaultPhysicalPlanner {
}
LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row,
produce_one_row: false,
schema,
}) => Ok(Arc::new(EmptyExec::new(
*produce_one_row,
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: true,
schema,
}) => Ok(Arc::new(PlaceHolderRowExec::new(
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
Expand Down Expand Up @@ -2767,7 +2773,7 @@ mod tests {

digraph {
1[shape=box label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]", tooltip=""]
2[shape=box label="EmptyExec: produce_one_row=false", tooltip=""]
2[shape=box label="EmptyExec", tooltip=""]
1 -> 2 [arrowhead=none, arrowtail=normal, dir=back]
}
// End DataFusion GraphViz Plan
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::logical_expr::{
col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE,
};
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
collect, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
Expand All @@ -42,6 +41,7 @@ use datafusion_common::project_schema;
use datafusion_common::stats::Precision;

use async_trait::async_trait;
use datafusion_physical_plan::placeholder_row::PlaceHolderRowExec;
use futures::stream::Stream;

/// Also run all tests that are found in the `custom_sources_cases` directory
Expand Down Expand Up @@ -256,9 +256,9 @@ async fn optimizers_catch_all_statistics() {

let physical_plan = df.create_physical_plan().await.unwrap();

// when the optimization kicks in, the source is replaced by an EmptyExec
// when the optimization kicks in, the source is replaced by an PlaceHolderRowExec
assert!(
contains_empty_exec(Arc::clone(&physical_plan)),
contains_place_holder_exec(Arc::clone(&physical_plan)),
"Expected aggregate_statistics optimizations missing: {physical_plan:?}"
);

Expand All @@ -283,12 +283,12 @@ async fn optimizers_catch_all_statistics() {
assert_eq!(format!("{:?}", actual[0]), format!("{expected:?}"));
}

fn contains_empty_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
if plan.as_any().is::<EmptyExec>() {
fn contains_place_holder_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
if plan.as_any().is::<PlaceHolderRowExec>() {
true
} else if plan.children().len() != 1 {
false
} else {
contains_empty_exec(Arc::clone(&plan.children()[0]))
contains_place_holder_exec(Arc::clone(&plan.children()[0]))
}
}
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ async fn explain_analyze_runs_optimizers() {

// This happens as an optimization pass where count(*) can be
// answered using statistics only.
let expected = "EmptyExec: produce_one_row=true";
let expected = "PlaceHolderRowExec";

let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
let actual = execute_to_batches(&ctx, sql).await;
Expand Down Expand Up @@ -806,7 +806,7 @@ async fn explain_physical_plan_only() {
let expected = vec![vec![
"physical_plan",
"ProjectionExec: expr=[2 as COUNT(*)]\
\n EmptyExec: produce_one_row=true\
\n PlaceHolderRowExec\
\n",
]];
assert_eq!(expected, actual);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/optimizer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ Looking at the `EXPLAIN` output we can see that the optimizer has effectively re
| logical_plan | Projection: Int64(3) AS Int64(1) + Int64(2) |
| | EmptyRelation |
| physical_plan | ProjectionExec: expr=[3 as Int64(1) + Int64(2)] |
| | EmptyExec: produce_one_row=true |
| | PlaceHolderRowExec |
| | |
+---------------+-------------------------------------------------+
```
Expand Down Expand Up @@ -318,15 +318,15 @@ In the following example, the `type_coercion` and `simplify_expressions` passes
| logical_plan | Projection: Utf8("3.2") AS foo |
| | EmptyRelation |
| initial_physical_plan | ProjectionExec: expr=[3.2 as foo] |
| | EmptyExec: produce_one_row=true |
| | PlaceHolderRowExec |
| | |
| physical_plan after aggregate_statistics | SAME TEXT AS ABOVE |
| physical_plan after join_selection | SAME TEXT AS ABOVE |
| physical_plan after coalesce_batches | SAME TEXT AS ABOVE |
| physical_plan after repartition | SAME TEXT AS ABOVE |
| physical_plan after add_merge_exec | SAME TEXT AS ABOVE |
| physical_plan | ProjectionExec: expr=[3.2 as foo] |
| | EmptyExec: produce_one_row=true |
| | PlaceHolderRowExec |
| | |
+------------------------------------------------------------+---------------------------------------------------------------------------+
```
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
/// ```dot
/// strict digraph dot_plan {
// 0[label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]",tooltip=""]
// 1[label="EmptyExec: produce_one_row=false",tooltip=""]
// 1[label="EmptyExec",tooltip=""]
// 0 -> 1
// }
/// ```
Expand Down
93 changes: 10 additions & 83 deletions datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! EmptyRelation execution plan
//! EmptyRelation with produce_one_row=false execution plan

use std::any::Any;
use std::sync::Arc;
Expand All @@ -24,19 +24,16 @@ use super::expressions::PhysicalSortExpr;
use super::{common, DisplayAs, SendableRecordBatchStream, Statistics};
use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning};

use arrow::array::{ArrayRef, NullArray};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_execution::TaskContext;

use log::trace;

/// Execution plan for empty relation (produces no rows)
/// Execution plan for empty relation with produce_one_row=false
#[derive(Debug)]
pub struct EmptyExec {
/// Specifies whether this exec produces a row or not
produce_one_row: bool,
/// The schema for the produced row
schema: SchemaRef,
/// Number of partitions
Expand All @@ -45,9 +42,8 @@ pub struct EmptyExec {

impl EmptyExec {
/// Create a new EmptyExec
pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self {
pub fn new(schema: SchemaRef) -> Self {
EmptyExec {
produce_one_row,
schema,
partitions: 1,
}
Expand All @@ -59,36 +55,8 @@ impl EmptyExec {
self
}

/// Specifies whether this exec produces a row or not
pub fn produce_one_row(&self) -> bool {
self.produce_one_row
}

fn data(&self) -> Result<Vec<RecordBatch>> {
let batch = if self.produce_one_row {
let n_field = self.schema.fields.len();
// hack for https://github.com/apache/arrow-datafusion/pull/3242
let n_field = if n_field == 0 { 1 } else { n_field };
vec![RecordBatch::try_new(
Arc::new(Schema::new(
(0..n_field)
.map(|i| {
Field::new(format!("placeholder_{i}"), DataType::Null, true)
})
.collect::<Fields>(),
)),
(0..n_field)
.map(|_i| {
let ret: ArrayRef = Arc::new(NullArray::new(1));
ret
})
.collect(),
)?]
} else {
vec![]
};

Ok(batch)
Ok(vec![])
}
}

Expand All @@ -100,7 +68,7 @@ impl DisplayAs for EmptyExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row)
write!(f, "EmptyExec")
}
}
}
Expand Down Expand Up @@ -133,10 +101,7 @@ impl ExecutionPlan for EmptyExec {
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(EmptyExec::new(
self.produce_one_row,
self.schema.clone(),
)))
Ok(Arc::new(EmptyExec::new(self.schema.clone())))
}

fn execute(
Expand Down Expand Up @@ -184,7 +149,7 @@ mod tests {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();

let empty = EmptyExec::new(false, schema.clone());
let empty = EmptyExec::new(schema.clone());
assert_eq!(empty.schema(), schema);

// we should have no results
Expand All @@ -198,16 +163,11 @@ mod tests {
#[test]
fn with_new_children() -> Result<()> {
let schema = test::aggr_test_schema();
let empty = Arc::new(EmptyExec::new(false, schema.clone()));
let empty_with_row = Arc::new(EmptyExec::new(true, schema));
let empty = Arc::new(EmptyExec::new(schema.clone()));

let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?.into();
assert_eq!(empty.schema(), empty2.schema());

let empty_with_row_2 =
with_new_children_if_necessary(empty_with_row.clone(), vec![])?.into();
assert_eq!(empty_with_row.schema(), empty_with_row_2.schema());

let too_many_kids = vec![empty2];
assert!(
with_new_children_if_necessary(empty, too_many_kids).is_err(),
Expand All @@ -220,44 +180,11 @@ mod tests {
async fn invalid_execute() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(false, schema);
let empty = EmptyExec::new(schema);

// ask for the wrong partition
assert!(empty.execute(1, task_ctx.clone()).is_err());
assert!(empty.execute(20, task_ctx).is_err());
Ok(())
}

#[tokio::test]
async fn produce_one_row() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(true, schema);

let iter = empty.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;

// should have one item
assert_eq!(batches.len(), 1);

Ok(())
}

#[tokio::test]
async fn produce_one_row_multiple_partition() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let partitions = 3;
let empty = EmptyExec::new(true, schema).with_partitions(partitions);

for n in 0..partitions {
let iter = empty.execute(n, task_ctx.clone())?;
let batches = common::collect(iter).await?;

// should have one item
assert_eq!(batches.len(), 1);
}

Ok(())
}
}
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub mod limit;
pub mod memory;
pub mod metrics;
mod ordering;
pub mod placeholder_row;
pub mod projection;
pub mod repartition;
pub mod sorts;
Expand Down
Loading