Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache common plan properties to eliminate recursive calls in physical plan #9346

Merged
merged 26 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fe0b1fe
Initial commit
mustafasrepo Feb 14, 2024
27b6805
Update comments
mustafasrepo Feb 15, 2024
8d8cb8b
Merge branch 'main' into feature/properties_caching
mustafasrepo Feb 16, 2024
c8cece8
Review Part 1
ozankabak Feb 21, 2024
01eaf45
Merge branch 'main' into feature/properties_caching
mustafasrepo Feb 21, 2024
93f5282
Minor changes
mustafasrepo Feb 21, 2024
10000fb
Delete docs.yaml
metesynnada Feb 22, 2024
6c76423
Merge pull request #5 from synnada-ai/ci-action-fixing
mustafasrepo Feb 22, 2024
599516b
Merge branch 'apache:main' into main
mustafasrepo Feb 23, 2024
b20b65c
Merge branch 'apache:main' into main
mustafasrepo Feb 26, 2024
ace9815
use create_cache_convention
mustafasrepo Feb 22, 2024
6c066b9
Merge branch 'apache:main' into main
mustafasrepo Feb 26, 2024
6770106
Merge remote-tracking branch 'upstream/main'
mustafasrepo Feb 27, 2024
07a438d
Address reviews
mustafasrepo Feb 27, 2024
38db3d8
Merge branch 'apache:main' into main
mustafasrepo Feb 27, 2024
eced5bc
Address reviews
mustafasrepo Feb 27, 2024
39e402a
Merge branch 'main' into feature/remove_default_cache
mustafasrepo Feb 27, 2024
a012844
Update datafusion/physical-plan/src/lib.rs
mustafasrepo Feb 27, 2024
e4a9947
Update comments
mustafasrepo Feb 27, 2024
a940a46
Move properties to another trait.
mustafasrepo Feb 27, 2024
77e5c35
Final review
ozankabak Feb 27, 2024
4690037
Merge branch 'feature/remove_default_cache' of https://github.com/syn…
mustafasrepo Feb 28, 2024
930ac87
Resolve linter errors
mustafasrepo Feb 28, 2024
a8fac85
Bring docs yaml
mustafasrepo Feb 28, 2024
5e10243
Merge branch 'apache_main' into feature/remove_default_cache
mustafasrepo Feb 28, 2024
fea2174
Final reviews and cleanups
ozankabak Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ use crate::{

use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::physical_plan::is_plan_streaming;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::dml::CopyTo;
use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream};
use datafusion::prelude::SessionContext;
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::logical_expr::dml::CopyTo;
use datafusion::sql::parser::Statement;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::signal;
Expand Down Expand Up @@ -231,7 +230,7 @@ async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if is_plan_streaming(&physical_plan)? {
if physical_plan.execution_mode().is_unbounded() {
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
Expand Down Expand Up @@ -305,10 +304,9 @@ mod tests {
use std::str::FromStr;

use super::*;
use datafusion::common::plan_err;
use datafusion_common::{
file_options::StatementOptions, FileType, FileTypeWriterOptions,
};

use datafusion::common::{plan_err, FileType, FileTypeWriterOptions};
use datafusion_common::file_options::StatementOptions;

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
let ctx = SessionContext::new();
Expand Down
31 changes: 18 additions & 13 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use datafusion::dataframe::DataFrame;
use datafusion::datasource::{provider_as_source, TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream,
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning, PlanPropertiesCache, SendableRecordBatchStream,
};
use datafusion::prelude::*;
use datafusion_expr::{Expr, LogicalPlanBuilder};
use datafusion_physical_expr::EquivalenceProperties;

use async_trait::async_trait;
use tokio::time::timeout;
Expand Down Expand Up @@ -190,6 +190,7 @@ impl TableProvider for CustomDataSource {
struct CustomExec {
db: CustomDataSource,
projected_schema: SchemaRef,
cache: PlanPropertiesCache,
}

impl CustomExec {
Expand All @@ -199,11 +200,23 @@ impl CustomExec {
db: CustomDataSource,
) -> Self {
let projected_schema = project_schema(&schema, projections).unwrap();
let cache = Self::create_cache(projected_schema.clone());
Self {
db,
projected_schema,
cache,
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
let eq_properties = EquivalenceProperties::new(schema);
PlanPropertiesCache::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
}
}

impl DisplayAs for CustomExec {
Expand All @@ -217,16 +230,8 @@ impl ExecutionPlan for CustomExec {
self
}

fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
fn cache(&self) -> &PlanPropertiesCache {
&self.cache
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
16 changes: 7 additions & 9 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
use crate::datasource::{provider_as_source, MemTable, TableProvider};
use crate::error::Result;
use crate::execution::{
context::{SessionState, TaskContext},
FunctionRegistry,
};
use crate::execution::context::{SessionState, TaskContext};
use crate::execution::FunctionRegistry;
use crate::logical_expr::utils::find_window_exprs;
use crate::logical_expr::{
col, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, TableType,
Expand All @@ -40,6 +38,7 @@ use crate::physical_plan::{
collect, collect_partitioned, execute_stream, execute_stream_partitioned,
ExecutionPlan, SendableRecordBatchStream,
};
use crate::prelude::SessionContext;

use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
Expand All @@ -58,7 +57,6 @@ use datafusion_expr::{
TableProviderFilterPushDown, UNNAMED_TABLE,
};

use crate::prelude::SessionContext;
use async_trait::async_trait;

/// Contains options that control how data is
Expand Down Expand Up @@ -2902,7 +2900,7 @@ mod tests {
// For non-partition aware union, the output partitioning count should be the combination of all output partitions count
assert!(matches!(
physical_plan.output_partitioning(),
Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2));
Partitioning::UnknownPartitioning(partition_count) if *partition_count == default_partition_count * 2));
Ok(())
}

Expand Down Expand Up @@ -2951,7 +2949,7 @@ mod tests {
];
assert_eq!(
out_partitioning,
Partitioning::Hash(left_exprs, default_partition_count)
&Partitioning::Hash(left_exprs, default_partition_count)
);
}
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
Expand All @@ -2961,13 +2959,13 @@ mod tests {
];
assert_eq!(
out_partitioning,
Partitioning::Hash(right_exprs, default_partition_count)
&Partitioning::Hash(right_exprs, default_partition_count)
);
}
JoinType::Full => {
assert!(matches!(
out_partitioning,
Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count));
&Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count));
}
}
}
Expand Down
67 changes: 43 additions & 24 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
use std::any::Any;
use std::sync::Arc;

use super::FileGroupPartitioner;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, FileScanConfig,
};
Expand All @@ -34,14 +36,13 @@ use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache};

use futures::StreamExt;
use itertools::Itertools;
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};

use super::FileGroupPartitioner;

/// Execution plan for scanning Arrow data source
#[derive(Debug, Clone)]
#[allow(dead_code)]
Expand All @@ -52,26 +53,61 @@ pub struct ArrowExec {
projected_output_ordering: Vec<LexOrdering>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanPropertiesCache,
}

impl ArrowExec {
/// Create a new Arrow reader execution plan provided base configurations
pub fn new(base_config: FileScanConfig) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();

let cache = Self::create_cache(
projected_schema.clone(),
&projected_output_ordering,
&base_config,
);
Self {
base_config,
projected_schema,
projected_statistics,
projected_output_ordering,
metrics: ExecutionPlanMetricsSet::new(),
cache,
}
}
/// Ref to the base configs
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}

fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
schema: SchemaRef,
projected_output_ordering: &[LexOrdering],
file_scan_config: &FileScanConfig,
) -> PlanPropertiesCache {
// Equivalence Properties
let eq_properties =
EquivalenceProperties::new_with_orderings(schema, projected_output_ordering);

PlanPropertiesCache::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
)
}

fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
self.base_config.file_groups = file_groups;
// Changing file groups may invalidate output partitioning. Update it also
let output_partitioning = Self::output_partitioning_helper(&self.base_config);
self.cache = self.cache.with_partitioning(output_partitioning);
self
}
}

impl DisplayAs for ArrowExec {
Expand All @@ -90,25 +126,8 @@ impl ExecutionPlan for ArrowExec {
self
}

fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.projected_output_ordering
.first()
.map(|ordering| ordering.as_slice())
}

fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
fn cache(&self) -> &PlanPropertiesCache {
&self.cache
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -138,7 +157,7 @@ impl ExecutionPlan for ArrowExec {

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut new_plan = self.clone();
new_plan.base_config.file_groups = repartitioned_file_groups;
new_plan = new_plan.with_file_groups(repartitioned_file_groups);
return Ok(Some(Arc::new(new_plan)));
}
Ok(None)
Expand Down
Loading
Loading