Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache common plan properties to eliminate recursive calls in physical plan #9346

Merged
merged 26 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fe0b1fe
Initial commit
mustafasrepo Feb 14, 2024
27b6805
Update comments
mustafasrepo Feb 15, 2024
8d8cb8b
Merge branch 'main' into feature/properties_caching
mustafasrepo Feb 16, 2024
c8cece8
Review Part 1
ozankabak Feb 21, 2024
01eaf45
Merge branch 'main' into feature/properties_caching
mustafasrepo Feb 21, 2024
93f5282
Minor changes
mustafasrepo Feb 21, 2024
10000fb
Delete docs.yaml
metesynnada Feb 22, 2024
6c76423
Merge pull request #5 from synnada-ai/ci-action-fixing
mustafasrepo Feb 22, 2024
599516b
Merge branch 'apache:main' into main
mustafasrepo Feb 23, 2024
b20b65c
Merge branch 'apache:main' into main
mustafasrepo Feb 26, 2024
ace9815
use create_cache_convention
mustafasrepo Feb 22, 2024
6c066b9
Merge branch 'apache:main' into main
mustafasrepo Feb 26, 2024
6770106
Merge remote-tracking branch 'upstream/main'
mustafasrepo Feb 27, 2024
07a438d
Address reviews
mustafasrepo Feb 27, 2024
38db3d8
Merge branch 'apache:main' into main
mustafasrepo Feb 27, 2024
eced5bc
Address reviews
mustafasrepo Feb 27, 2024
39e402a
Merge branch 'main' into feature/remove_default_cache
mustafasrepo Feb 27, 2024
a012844
Update datafusion/physical-plan/src/lib.rs
mustafasrepo Feb 27, 2024
e4a9947
Update comments
mustafasrepo Feb 27, 2024
a940a46
Move properties to another trait.
mustafasrepo Feb 27, 2024
77e5c35
Final review
ozankabak Feb 27, 2024
4690037
Merge branch 'feature/remove_default_cache' of https://github.com/syn…
mustafasrepo Feb 28, 2024
930ac87
Resolve linter errors
mustafasrepo Feb 28, 2024
a8fac85
Bring docs yaml
mustafasrepo Feb 28, 2024
5e10243
Merge branch 'apache_main' into feature/remove_default_cache
mustafasrepo Feb 28, 2024
fea2174
Final reviews and cleanups
ozankabak Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ use crate::{

use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::physical_plan::is_plan_streaming;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::dml::CopyTo;
use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream};
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::prelude::SessionContext;
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::logical_expr::dml::CopyTo;
use datafusion::sql::parser::Statement;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::signal;
Expand Down Expand Up @@ -231,7 +230,7 @@ async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if is_plan_streaming(&physical_plan)? {
if physical_plan.execution_mode().is_unbounded() {
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
Expand Down Expand Up @@ -305,10 +304,9 @@ mod tests {
use std::str::FromStr;

use super::*;
use datafusion::common::plan_err;
use datafusion_common::{
file_options::StatementOptions, FileType, FileTypeWriterOptions,
};

use datafusion::common::{plan_err, FileType, FileTypeWriterOptions};
use datafusion_common::file_options::StatementOptions;

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
let ctx = SessionContext::new();
Expand Down
31 changes: 18 additions & 13 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use datafusion::dataframe::DataFrame;
use datafusion::datasource::{provider_as_source, TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream,
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning, PlanProperties, SendableRecordBatchStream,
};
use datafusion::prelude::*;
use datafusion_expr::{Expr, LogicalPlanBuilder};
use datafusion_physical_expr::EquivalenceProperties;

use async_trait::async_trait;
use tokio::time::timeout;
Expand Down Expand Up @@ -190,6 +190,7 @@ impl TableProvider for CustomDataSource {
struct CustomExec {
db: CustomDataSource,
projected_schema: SchemaRef,
cache: PlanProperties,
}

impl CustomExec {
Expand All @@ -199,11 +200,23 @@ impl CustomExec {
db: CustomDataSource,
) -> Self {
let projected_schema = project_schema(&schema, projections).unwrap();
let cache = Self::compute_properties(projected_schema.clone());
Self {
db,
projected_schema,
cache,
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
}
}

impl DisplayAs for CustomExec {
Expand All @@ -217,16 +230,8 @@ impl ExecutionPlan for CustomExec {
self
}

fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
fn properties(&self) -> &PlanProperties {
&self.cache
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
18 changes: 8 additions & 10 deletions datafusion/core/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,34 +68,32 @@

use std::sync::Arc;

use arrow::array::DictionaryArray;
use arrow::datatypes::Int32Type;
use arrow::{
array::{Float64Array, Int64Array, StringArray},
array::{DictionaryArray, Float64Array, Int64Array, StringArray},
compute::SortOptions,
datatypes::Schema,
datatypes::{Int32Type, Schema},
record_batch::RecordBatch,
};

/// Benchmarks for SortPreservingMerge stream
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::{
execution::context::TaskContext,
physical_plan::{
memory::MemoryExec, sorts::sort_preserving_merge::SortPreservingMergeExec,
ExecutionPlan,
coalesce_partitions::CoalescePartitionsExec, memory::MemoryExec,
sorts::sort_preserving_merge::SortPreservingMergeExec, ExecutionPlan,
ExecutionPlanProperties,
},
prelude::SessionContext,
};
use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};

/// Benchmarks for SortPreservingMerge stream
use criterion::{criterion_group, criterion_main, Criterion};
use futures::StreamExt;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tokio::runtime::Runtime;

use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;

/// Total number of streams to divide each input into
/// models 8 partition plan (should it be 16??)
const NUM_STREAMS: usize = 8;
Expand Down
21 changes: 9 additions & 12 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@ mod parquet;
use std::any::Any;
use std::sync::Arc;

use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
use crate::datasource::{provider_as_source, MemTable, TableProvider};
use crate::error::Result;
use crate::execution::{
context::{SessionState, TaskContext},
FunctionRegistry,
};
use crate::execution::context::{SessionState, TaskContext};
use crate::execution::FunctionRegistry;
use crate::logical_expr::utils::find_window_exprs;
use crate::logical_expr::{
col, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, TableType,
Expand All @@ -40,11 +37,12 @@ use crate::physical_plan::{
collect, collect_partitioned, execute_stream, execute_stream_partitioned,
ExecutionPlan, SendableRecordBatchStream,
};
use crate::prelude::SessionContext;

use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::parsers::CompressionTypeVariant;
Expand All @@ -58,7 +56,6 @@ use datafusion_expr::{
TableProviderFilterPushDown, UNNAMED_TABLE,
};

use crate::prelude::SessionContext;
use async_trait::async_trait;

/// Contains options that control how data is
Expand Down Expand Up @@ -1519,7 +1516,7 @@ mod tests {
WindowFunctionDefinition,
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::get_plan_string;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

// Get string representation of the plan
async fn assert_physical_plan(df: &DataFrame, expected: Vec<&str>) {
Expand Down Expand Up @@ -2907,7 +2904,7 @@ mod tests {
// For non-partition aware union, the output partitioning count should be the combination of all output partitions count
assert!(matches!(
physical_plan.output_partitioning(),
Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2));
Partitioning::UnknownPartitioning(partition_count) if *partition_count == default_partition_count * 2));
Ok(())
}

Expand Down Expand Up @@ -2956,7 +2953,7 @@ mod tests {
];
assert_eq!(
out_partitioning,
Partitioning::Hash(left_exprs, default_partition_count)
&Partitioning::Hash(left_exprs, default_partition_count)
);
}
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
Expand All @@ -2966,13 +2963,13 @@ mod tests {
];
assert_eq!(
out_partitioning,
Partitioning::Hash(right_exprs, default_partition_count)
&Partitioning::Hash(right_exprs, default_partition_count)
);
}
JoinType::Full => {
assert!(matches!(
out_partitioning,
Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count));
&Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count));
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use super::PartitionedFile;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::{
create_ordering,
create_ordering, get_statistics_with_limit, TableProvider, TableType,
};
use crate::datasource::{
file_format::{
arrow::ArrowFormat,
avro::AvroFormat,
Expand All @@ -36,10 +38,8 @@ use crate::datasource::{
json::JsonFormat,
FileFormat,
},
get_statistics_with_limit,
listing::ListingTableUrl,
physical_plan::{FileScanConfig, FileSinkConfig},
TableProvider, TableType,
};
use crate::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -920,6 +920,8 @@ mod tests {
use datafusion_common::{assert_contains, GetExt, ScalarValue};
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::ExecutionPlanProperties;

use tempfile::TempDir;

#[tokio::test]
Expand Down
40 changes: 23 additions & 17 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,37 @@

//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.

use datafusion_physical_plan::metrics::MetricsSet;
use futures::StreamExt;
use log::debug;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
use datafusion_execution::TaskContext;
use parking_lot::Mutex;
use tokio::sync::RwLock;
use tokio::task::JoinSet;

use crate::datasource::{TableProvider, TableType};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::{common, SendableRecordBatchStream};
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{
common, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
Partitioning, SendableRecordBatchStream,
};
use crate::physical_planner::create_physical_sort_expr;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
use datafusion_execution::TaskContext;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use futures::StreamExt;
use log::debug;
use parking_lot::Mutex;
use tokio::sync::RwLock;
use tokio::task::JoinSet;

/// Type alias for partition data
pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;

Expand Down Expand Up @@ -166,7 +169,7 @@ impl MemTable {

// execute and collect results
let mut output_partitions = vec![];
for i in 0..exec.output_partitioning().partition_count() {
for i in 0..exec.properties().output_partitioning().partition_count() {
// execute this *output* partition and collect all batches
let task_ctx = state.task_ctx();
let mut stream = exec.execute(i, task_ctx)?;
Expand Down Expand Up @@ -361,17 +364,20 @@ impl DataSink for MemSink {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::*;
use crate::datasource::provider_as_source;
use crate::physical_plan::collect;
use crate::prelude::SessionContext;

use arrow::array::{AsArray, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, UInt64Type};
use arrow::error::ArrowError;
use datafusion_common::DataFusionError;
use datafusion_expr::LogicalPlanBuilder;

use futures::StreamExt;
use std::collections::HashMap;

#[tokio::test]
async fn test_with_projection() -> Result<()> {
Expand Down
Loading
Loading