diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index a6c1597162c0..4fa7a806b309 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -45,6 +45,12 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -302,7 +308,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.17", + "syn 2.0.18", ] [[package]] @@ -749,12 +755,12 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.24" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e3c5919066adf22df73762e50cffcde3a758f2a848b113b586d1f86728b673b" +checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" dependencies = [ + "android-tzdata", "iana-time-zone", - "num-integer", "num-traits", "serde", "winapi", @@ -834,9 +840,9 @@ dependencies = [ [[package]] name = "comfy-table" -version = "6.1.4" +version = "6.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e7b787b0dc42e8111badfdbe4c3059158ccb2db8780352fa1b01e8ccf45cc4d" +checksum = "7e959d788268e3bf9d35ace83e81b124190378e4c91c9067524675e33394b8ba" dependencies = [ "strum", "strum_macros", @@ -923,9 +929,9 @@ dependencies = [ [[package]] name = "csv" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b015497079b9a9d69c02ad25de6c0a6edef051ea6360a327d0bd05802ef64ad" +checksum = "626ae34994d3d8d668f4269922248239db4ae42d538b14c398b74a52208e8086" dependencies = [ "csv-core", "itoa", @@ -1038,9 +1044,13 @@ dependencies = [ name = "datafusion-execution" version = "25.0.0" dependencies = [ + "arrow", + "chrono", "dashmap", "datafusion-common", "datafusion-expr", + "datafusion-physical-expr", + "futures", "hashbrown 0.13.2", "log", "object_store", @@ -1372,7 +1382,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.17", + "syn 2.0.18", ] [[package]] @@ -1831,12 +1841,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.17" +version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" -dependencies = [ - "cfg-if", -] +checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de" [[package]] name = "lz4" @@ -1910,14 +1917,13 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.6" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" +checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", - "log", "wasi", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -2061,9 +2067,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.17.1" +version = "1.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" +checksum = "9670a07f94779e00908f3e686eab508878ebb390ba6e604d3a284c00e8d0487b" [[package]] name = "openssl-probe" @@ -2235,7 +2241,7 @@ checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" dependencies = [ "proc-macro2", "quote", - "syn 2.0.17", + "syn 2.0.18", ] [[package]] @@ -2661,7 +2667,7 @@ checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.17", + "syn 2.0.18", ] [[package]] @@ -2843,9 +2849,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.17" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45b6ddbb36c5b969c182aec3c4a0bce7df3fbad4b77114706a49aacc80567388" +checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" dependencies = [ "proc-macro2", "quote", @@ -2897,7 +2903,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.17", + "syn 2.0.18", ] [[package]] @@ -2963,9 +2969,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.28.1" +version = "1.28.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" +checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" dependencies = [ "autocfg", "bytes", @@ -2987,7 +2993,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.17", + "syn 2.0.18", ] [[package]] @@ -3085,7 +3091,7 @@ checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" dependencies = [ "proc-macro2", "quote", - "syn 2.0.17", + "syn 2.0.18", ] [[package]] @@ -3249,7 +3255,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.17", + "syn 2.0.18", "wasm-bindgen-shared", ] @@ -3283,7 +3289,7 @@ checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.17", + "syn 2.0.18", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/datafusion/core/src/physical_plan/display.rs b/datafusion/core/src/physical_plan/display.rs index 5f286eed185c..3e0173cd52e0 100644 --- a/datafusion/core/src/physical_plan/display.rs +++ b/datafusion/core/src/physical_plan/display.rs @@ -25,12 +25,8 @@ use crate::logical_expr::{StringifiedPlan, ToStringifiedPlan}; use super::{accept, ExecutionPlan, ExecutionPlanVisitor}; -/// Options for controlling how each [`ExecutionPlan`] should format itself -#[derive(Debug, Clone, Copy)] -pub enum DisplayFormatType { - /// Default, compact format. Example: `FilterExec: c12 < 10.0` - Default, -} +// backward compatibility +pub use datafusion_execution::plan::DisplayFormatType; /// Wraps an `ExecutionPlan` with various ways to display this plan pub struct DisplayableExecutionPlan<'a> { diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 2c3a3e9b4908..f86ff792838c 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -18,7 +18,6 @@ //! Traits for physical query plan, supporting parallel execution for partitioned relations. pub use self::metrics::Metric; -use self::metrics::MetricsSet; use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; @@ -29,33 +28,23 @@ use crate::physical_plan::expressions::PhysicalSortExpr; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::utils::DataPtr; pub use datafusion_expr::Accumulator; pub use datafusion_expr::ColumnarValue; pub use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator; use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties; pub use display::DisplayFormatType; use futures::stream::{Stream, TryStreamExt}; -use std::fmt; -use std::fmt::Debug; -use datafusion_common::tree_node::Transformed; use datafusion_common::DataFusionError; +use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::{any::Any, pin::Pin}; - -/// Trait for types that stream [arrow::record_batch::RecordBatch] -pub trait RecordBatchStream: Stream> { - /// Returns the schema of this `RecordBatchStream`. - /// - /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this - /// stream should have the same schema as returned from this method. - fn schema(&self) -> SchemaRef; -} -/// Trait for a stream of record batches. -pub type SendableRecordBatchStream = Pin>; +// backwards compatibility +pub use datafusion_execution::{ + ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, +}; +pub use datafusion_physical_expr::{Distribution, Partitioning}; /// EmptyRecordBatchStream can be used to create a RecordBatchStream /// that will produce no results @@ -91,156 +80,6 @@ impl Stream for EmptyRecordBatchStream { /// Physical planner interface pub use self::planner::PhysicalPlanner; -/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan. -/// -/// Each `ExecutionPlan` is partition-aware and is responsible for -/// creating the actual `async` [`SendableRecordBatchStream`]s -/// of [`RecordBatch`] that incrementally compute the operator's -/// output from its input partition. -/// -/// [`ExecutionPlan`] can be displayed in a simplified form using the -/// return value from [`displayable`] in addition to the (normally -/// quite verbose) `Debug` output. -pub trait ExecutionPlan: Debug + Send + Sync { - /// Returns the execution plan as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// Get the schema for this execution plan - fn schema(&self) -> SchemaRef; - - /// Specifies the output partitioning scheme of this plan - fn output_partitioning(&self) -> Partitioning; - - /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but its input(s) are - /// infinite, returns an error to indicate this. - fn unbounded_output(&self, _children: &[bool]) -> Result { - Ok(false) - } - - /// If the output of this operator within each partition is sorted, - /// returns `Some(keys)` with the description of how it was sorted. - /// - /// For example, Sort, (obviously) produces sorted output as does - /// SortPreservingMergeStream. Less obviously `Projection` - /// produces sorted output if its input was sorted as it does not - /// reorder the input rows, - /// - /// It is safe to return `None` here if your operator does not - /// have any particular output order here - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; - - /// Specifies the data distribution requirements for all the - /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child, - fn required_input_distribution(&self) -> Vec { - vec![Distribution::UnspecifiedDistribution; self.children().len()] - } - - /// Specifies the ordering requirements for all of the children - /// For each child, it's the local ordering requirement within - /// each partition rather than the global ordering - /// - /// NOTE that checking `!is_empty()` does **not** check for a - /// required input ordering. Instead, the correct check is that at - /// least one entry must be `Some` - fn required_input_ordering(&self) -> Vec>> { - vec![None; self.children().len()] - } - - /// Returns `false` if this operator's implementation may reorder - /// rows within or between partitions. - /// - /// For example, Projection, Filter, and Limit maintain the order - /// of inputs -- they may transform values (Projection) or not - /// produce the same number of rows that went in (Filter and - /// Limit), but the rows that are produced go in the same way. - /// - /// DataFusion uses this metadata to apply certain optimizations - /// such as automatically repartitioning correctly. - /// - /// The default implementation returns `false` - /// - /// WARNING: if you override this default, you *MUST* ensure that - /// the operator's maintains the ordering invariant or else - /// DataFusion may produce incorrect results. - fn maintains_input_order(&self) -> Vec { - vec![false; self.children().len()] - } - - /// Returns `true` if this operator would benefit from - /// partitioning its input (and thus from more parallelism). For - /// operators that do very little work the overhead of extra - /// parallelism may outweigh any benefits - /// - /// The default implementation returns `true` unless this operator - /// has signalled it requires a single child input partition. - fn benefits_from_input_partitioning(&self) -> bool { - // By default try to maximize parallelism with more CPUs if - // possible - !self - .required_input_distribution() - .into_iter() - .any(|dist| matches!(dist, Distribution::SinglePartition)) - } - - /// Get the EquivalenceProperties within the plan - fn equivalence_properties(&self) -> EquivalenceProperties { - EquivalenceProperties::new(self.schema()) - } - - /// Get the OrderingEquivalenceProperties within the plan - fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties { - OrderingEquivalenceProperties::new(self.schema()) - } - - /// Get a list of child execution plans that provide the input for this plan. The returned list - /// will be empty for leaf nodes, will contain a single value for unary nodes, or two - /// values for binary nodes (such as joins). - fn children(&self) -> Vec>; - - /// Returns a new plan where all children were replaced by new plans. - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result>; - - /// creates an iterator - fn execute( - &self, - partition: usize, - context: Arc, - ) -> Result; - - /// Return a snapshot of the set of [`Metric`]s for this - /// [`ExecutionPlan`]. - /// - /// While the values of the metrics in the returned - /// [`MetricsSet`]s may change as execution progresses, the - /// specific metrics will not. - /// - /// Once `self.execute()` has returned (technically the future is - /// resolved) for all available partitions, the set of metrics - /// should be complete. If this function is called prior to - /// `execute()` new metrics may appear in subsequent calls. - fn metrics(&self) -> Option { - None - } - - /// Format this `ExecutionPlan` to `f` in the specified type. - /// - /// Should not include a newline - /// - /// Note this function prints a placeholder by default to preserve - /// backwards compatibility. - fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "ExecutionPlan(PlaceHolder)") - } - - /// Returns the global output statistics for this `ExecutionPlan` node. - fn statistics(&self) -> Statistics; -} - /// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful /// especially for the distributed engine to judge whether need to deal with shuffling. /// Currently there are 3 kinds of execution plan which needs data exchange @@ -269,28 +108,8 @@ pub fn need_data_exchange(plan: Arc) -> bool { } } -/// Returns a copy of this plan if we change any child according to the pointer comparison. -/// The size of `children` must be equal to the size of `ExecutionPlan::children()`. -pub fn with_new_children_if_necessary( - plan: Arc, - children: Vec>, -) -> Result>> { - let old_children = plan.children(); - if children.len() != old_children.len() { - Err(DataFusionError::Internal( - "Wrong number of children".to_string(), - )) - } else if children.is_empty() - || children - .iter() - .zip(old_children.iter()) - .any(|(c1, c2)| !Arc::data_ptr_eq(c1, c2)) - { - Ok(Transformed::Yes(plan.with_new_children(children)?)) - } else { - Ok(Transformed::No(plan)) - } -} +// backwards compatibility +pub use datafusion_execution::plan::with_new_children_if_necessary; /// Return a [wrapper](DisplayableExecutionPlan) around an /// [`ExecutionPlan`] which can be displayed in various easier to @@ -477,105 +296,6 @@ pub fn execute_stream_partitioned( Ok(streams) } -/// Partitioning schemes supported by operators. -#[derive(Debug, Clone)] -pub enum Partitioning { - /// Allocate batches using a round-robin algorithm and the specified number of partitions - RoundRobinBatch(usize), - /// Allocate rows based on a hash of one of more expressions and the specified number of - /// partitions - Hash(Vec>, usize), - /// Unknown partitioning scheme with a known number of partitions - UnknownPartitioning(usize), -} - -impl Partitioning { - /// Returns the number of partitions in this partitioning scheme - pub fn partition_count(&self) -> usize { - use Partitioning::*; - match self { - RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n, - } - } - - /// Returns true when the guarantees made by this [[Partitioning]] are sufficient to - /// satisfy the partitioning scheme mandated by the `required` [[Distribution]] - pub fn satisfy EquivalenceProperties>( - &self, - required: Distribution, - equal_properties: F, - ) -> bool { - match required { - Distribution::UnspecifiedDistribution => true, - Distribution::SinglePartition if self.partition_count() == 1 => true, - Distribution::HashPartitioned(required_exprs) => { - match self { - // Here we do not check the partition count for hash partitioning and assumes the partition count - // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins, - // then we need to have the partition count and hash functions validation. - Partitioning::Hash(partition_exprs, _) => { - let fast_match = - expr_list_eq_strict_order(&required_exprs, partition_exprs); - // If the required exprs do not match, need to leverage the eq_properties provided by the child - // and normalize both exprs based on the eq_properties - if !fast_match { - let eq_properties = equal_properties(); - let eq_classes = eq_properties.classes(); - if !eq_classes.is_empty() { - let normalized_required_exprs = required_exprs - .iter() - .map(|e| { - normalize_expr_with_equivalence_properties( - e.clone(), - eq_classes, - ) - }) - .collect::>(); - let normalized_partition_exprs = partition_exprs - .iter() - .map(|e| { - normalize_expr_with_equivalence_properties( - e.clone(), - eq_classes, - ) - }) - .collect::>(); - expr_list_eq_strict_order( - &normalized_required_exprs, - &normalized_partition_exprs, - ) - } else { - fast_match - } - } else { - fast_match - } - } - _ => false, - } - } - _ => false, - } - } -} - -impl PartialEq for Partitioning { - fn eq(&self, other: &Partitioning) -> bool { - match (self, other) { - ( - Partitioning::RoundRobinBatch(count1), - Partitioning::RoundRobinBatch(count2), - ) if count1 == count2 => true, - (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2)) - if expr_list_eq_strict_order(exprs1, exprs2) && (count1 == count2) => - { - true - } - _ => false, - } - } -} - /// Retrieves the ordering equivalence properties for a given schema and output ordering. pub fn ordering_equivalence_properties_helper( schema: SchemaRef, @@ -613,40 +333,11 @@ pub fn ordering_equivalence_properties_helper( oep } -/// Distribution schemes -#[derive(Debug, Clone)] -pub enum Distribution { - /// Unspecified distribution - UnspecifiedDistribution, - /// A single partition is required - SinglePartition, - /// Requires children to be distributed in such a way that the same - /// values of the keys end up in the same partition - HashPartitioned(Vec>), -} - -impl Distribution { - /// Creates a Partitioning for this Distribution to satisfy itself - pub fn create_partitioning(&self, partition_count: usize) -> Partitioning { - match self { - Distribution::UnspecifiedDistribution => { - Partitioning::UnknownPartitioning(partition_count) - } - Distribution::SinglePartition => Partitioning::UnknownPartitioning(1), - Distribution::HashPartitioned(expr) => { - Partitioning::Hash(expr.clone(), partition_count) - } - } - } -} - use datafusion_physical_expr::expressions::Column; pub use datafusion_physical_expr::window::WindowExpr; -use datafusion_physical_expr::{ - expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, LexOrdering, -}; +use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::LexOrdering; pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr}; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement}; /// Applies an optional projection to a [`SchemaRef`], returning the /// projected schema @@ -702,20 +393,22 @@ pub mod insert; pub mod joins; pub mod limit; pub mod memory; -pub mod metrics; pub mod planner; pub mod projection; pub mod repartition; pub mod sorts; pub mod stream; pub mod streaming; -pub mod tree_node; pub mod udaf; pub mod union; pub mod unnest; pub mod values; pub mod windows; +// backwards compatibility +pub use datafusion_execution::metrics; +pub use datafusion_execution::tree_node; + use crate::execution::context::TaskContext; use crate::physical_plan::common::AbortOnDropSingle; use crate::physical_plan::repartition::RepartitionExec; diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 55a7c29974c2..6d8144cdde5a 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "datafusion-execution" -description = "Execution configuration support for DataFusion query engine" +description = "Execution configuration and traits for DataFusion query engine" keywords = [ "arrow", "query", "sql" ] version = { workspace = true } edition = { workspace = true } @@ -33,9 +33,13 @@ name = "datafusion_execution" path = "src/lib.rs" [dependencies] +arrow = { workspace = true } +chrono = { version = "0.4.23", default-features = false } dashmap = "5.4.0" datafusion-common = { path = "../common", version = "25.0.0" } datafusion-expr = { path = "../expr", version = "25.0.0" } +datafusion-physical-expr = { path = "../physical-expr", version = "25.0.0", default-features = false } +futures = "0.3" hashbrown = { version = "0.13", features = ["raw"] } log = "^0.4" object_store = "0.5.4" diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 07357e0579d1..e0d408e35990 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -20,9 +20,15 @@ pub mod config; pub mod disk_manager; pub mod memory_pool; +pub mod metrics; pub mod object_store; +pub mod plan; pub mod registry; pub mod runtime_env; +mod stream; mod task; +pub mod tree_node; +pub use plan::ExecutionPlan; +pub use stream::{RecordBatchStream, SendableRecordBatchStream}; pub use task::TaskContext; diff --git a/datafusion/core/src/physical_plan/metrics/baseline.rs b/datafusion/execution/src/metrics/baseline.rs similarity index 98% rename from datafusion/core/src/physical_plan/metrics/baseline.rs rename to datafusion/execution/src/metrics/baseline.rs index fbbb689aeef4..015fe9977cde 100644 --- a/datafusion/core/src/physical_plan/metrics/baseline.rs +++ b/datafusion/execution/src/metrics/baseline.rs @@ -22,14 +22,14 @@ use std::task::Poll; use arrow::record_batch::RecordBatch; use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp}; -use crate::error::Result; +use datafusion_common::Result; /// Helper for creating and tracking common "baseline" metrics for /// each operator /// /// Example: /// ``` -/// use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; +/// use datafusion_execution::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; /// let metrics = ExecutionPlanMetricsSet::new(); /// /// let partition = 2; diff --git a/datafusion/core/src/physical_plan/metrics/builder.rs b/datafusion/execution/src/metrics/builder.rs similarity index 99% rename from datafusion/core/src/physical_plan/metrics/builder.rs rename to datafusion/execution/src/metrics/builder.rs index 30e9764c6446..b8d89762883f 100644 --- a/datafusion/core/src/physical_plan/metrics/builder.rs +++ b/datafusion/execution/src/metrics/builder.rs @@ -29,7 +29,7 @@ use super::{ /// case of constant strings /// /// ```rust -/// use datafusion::physical_plan::metrics::*; +/// use datafusion_execution::metrics::*; /// /// let metrics = ExecutionPlanMetricsSet::new(); /// let partition = 1; diff --git a/datafusion/core/src/physical_plan/metrics/mod.rs b/datafusion/execution/src/metrics/mod.rs similarity index 99% rename from datafusion/core/src/physical_plan/metrics/mod.rs rename to datafusion/execution/src/metrics/mod.rs index 652c0af5c2e4..ad16f3c0b249 100644 --- a/datafusion/core/src/physical_plan/metrics/mod.rs +++ b/datafusion/execution/src/metrics/mod.rs @@ -43,7 +43,7 @@ pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp}; /// [`ExecutionPlanMetricsSet`]. /// /// ``` -/// use datafusion::physical_plan::metrics::*; +/// use datafusion_execution::metrics::*; /// /// let metrics = ExecutionPlanMetricsSet::new(); /// assert!(metrics.clone_inner().output_rows().is_none()); diff --git a/datafusion/core/src/physical_plan/metrics/value.rs b/datafusion/execution/src/metrics/value.rs similarity index 100% rename from datafusion/core/src/physical_plan/metrics/value.rs rename to datafusion/execution/src/metrics/value.rs diff --git a/datafusion/execution/src/plan.rs b/datafusion/execution/src/plan.rs new file mode 100644 index 000000000000..0391b0545f7e --- /dev/null +++ b/datafusion/execution/src/plan.rs @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::SchemaRef; +use core::fmt; +use datafusion_common::{ + tree_node::Transformed, utils::DataPtr, DataFusionError, Result, Statistics, +}; +use datafusion_physical_expr::{ + Distribution, EquivalenceProperties, OrderingEquivalenceProperties, Partitioning, + PhysicalSortExpr, PhysicalSortRequirement, +}; +use std::{any::Any, fmt::Debug, sync::Arc}; + +use crate::{metrics::MetricsSet, stream::SendableRecordBatchStream, TaskContext}; + +/// Options for controlling how each [`ExecutionPlan`] should format itself +#[derive(Debug, Clone, Copy)] +pub enum DisplayFormatType { + /// Default, compact format. Example: `FilterExec: c12 < 10.0` + Default, +} + +/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan. +/// +/// Each `ExecutionPlan` is partition-aware and is responsible for +/// creating the actual `async` [`SendableRecordBatchStream`]s +/// of [`RecordBatch`] that incrementally compute the operator's +/// output from its input partition. +/// +/// [`ExecutionPlan`] can be displayed in a simplified form using the +/// return value from [`displayable`] in addition to the (normally +/// quite verbose) `Debug` output. +/// +/// [`RecordBatch`]: arrow::record_batch::RecordBatch +/// [`displayable`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/fn.displayable.html +pub trait ExecutionPlan: Debug + Send + Sync { + /// Returns the execution plan as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Get the schema for this execution plan + fn schema(&self) -> SchemaRef; + + /// Specifies the output partitioning scheme of this plan + fn output_partitioning(&self) -> Partitioning; + + /// Specifies whether this plan generates an infinite stream of records. + /// If the plan does not support pipelining, but its input(s) are + /// infinite, returns an error to indicate this. + fn unbounded_output(&self, _children: &[bool]) -> Result { + Ok(false) + } + + /// If the output of this operator within each partition is sorted, + /// returns `Some(keys)` with the description of how it was sorted. + /// + /// For example, Sort, (obviously) produces sorted output as does + /// SortPreservingMergeStream. Less obviously `Projection` + /// produces sorted output if its input was sorted as it does not + /// reorder the input rows, + /// + /// It is safe to return `None` here if your operator does not + /// have any particular output order here + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; + + /// Specifies the data distribution requirements for all the + /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child, + fn required_input_distribution(&self) -> Vec { + vec![Distribution::UnspecifiedDistribution; self.children().len()] + } + + /// Specifies the ordering requirements for all of the children + /// For each child, it's the local ordering requirement within + /// each partition rather than the global ordering + /// + /// NOTE that checking `!is_empty()` does **not** check for a + /// required input ordering. Instead, the correct check is that at + /// least one entry must be `Some` + fn required_input_ordering(&self) -> Vec>> { + vec![None; self.children().len()] + } + + /// Returns `false` if this operator's implementation may reorder + /// rows within or between partitions. + /// + /// For example, Projection, Filter, and Limit maintain the order + /// of inputs -- they may transform values (Projection) or not + /// produce the same number of rows that went in (Filter and + /// Limit), but the rows that are produced go in the same way. + /// + /// DataFusion uses this metadata to apply certain optimizations + /// such as automatically repartitioning correctly. + /// + /// The default implementation returns `false` + /// + /// WARNING: if you override this default, you *MUST* ensure that + /// the operator's maintains the ordering invariant or else + /// DataFusion may produce incorrect results. + fn maintains_input_order(&self) -> Vec { + vec![false; self.children().len()] + } + + /// Returns `true` if this operator would benefit from + /// partitioning its input (and thus from more parallelism). For + /// operators that do very little work the overhead of extra + /// parallelism may outweigh any benefits + /// + /// The default implementation returns `true` unless this operator + /// has signalled it requires a single child input partition. + fn benefits_from_input_partitioning(&self) -> bool { + // By default try to maximize parallelism with more CPUs if + // possible + !self + .required_input_distribution() + .into_iter() + .any(|dist| matches!(dist, Distribution::SinglePartition)) + } + + /// Get the EquivalenceProperties within the plan + fn equivalence_properties(&self) -> EquivalenceProperties { + EquivalenceProperties::new(self.schema()) + } + + /// Get the OrderingEquivalenceProperties within the plan + fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties { + OrderingEquivalenceProperties::new(self.schema()) + } + + /// Get a list of child execution plans that provide the input for this plan. The returned list + /// will be empty for leaf nodes, will contain a single value for unary nodes, or two + /// values for binary nodes (such as joins). + fn children(&self) -> Vec>; + + /// Returns a new plan where all children were replaced by new plans. + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result>; + + /// creates an iterator + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result; + + /// Return a snapshot of the set of [`Metric`]s for this + /// [`ExecutionPlan`]. + /// + /// While the values of the metrics in the returned + /// [`MetricsSet`]s may change as execution progresses, the + /// specific metrics will not. + /// + /// Once `self.execute()` has returned (technically the future is + /// resolved) for all available partitions, the set of metrics + /// should be complete. If this function is called prior to + /// `execute()` new metrics may appear in subsequent calls. + /// + /// [`Metric`]: crate::metrics::Metric + fn metrics(&self) -> Option { + None + } + + /// Format this `ExecutionPlan` to `f` in the specified type. + /// + /// Should not include a newline + /// + /// Note this function prints a placeholder by default to preserve + /// backwards compatibility. + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "ExecutionPlan(PlaceHolder)") + } + + /// Returns the global output statistics for this `ExecutionPlan` node. + fn statistics(&self) -> Statistics; +} + +/// Returns a copy of this plan if we change any child according to the pointer comparison. +/// The size of `children` must be equal to the size of `ExecutionPlan::children()`. +pub fn with_new_children_if_necessary( + plan: Arc, + children: Vec>, +) -> Result>> { + let old_children = plan.children(); + if children.len() != old_children.len() { + Err(DataFusionError::Internal( + "Wrong number of children".to_string(), + )) + } else if children.is_empty() + || children + .iter() + .zip(old_children.iter()) + .any(|(c1, c2)| !Arc::data_ptr_eq(c1, c2)) + { + Ok(Transformed::Yes(plan.with_new_children(children)?)) + } else { + Ok(Transformed::No(plan)) + } +} diff --git a/datafusion/execution/src/stream.rs b/datafusion/execution/src/stream.rs new file mode 100644 index 000000000000..e05b69d704f7 --- /dev/null +++ b/datafusion/execution/src/stream.rs @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use futures::Stream; +use std::pin::Pin; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; + +/// Trait for types that stream [RecordBatch]es and know their schema +pub trait RecordBatchStream: Stream> { + /// Returns the schema of this `RecordBatchStream`. + /// + /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this + /// stream should have the same schema as returned from this method. + fn schema(&self) -> SchemaRef; +} + +/// Trait for a stream of record batches. +pub type SendableRecordBatchStream = Pin>; diff --git a/datafusion/core/src/physical_plan/tree_node.rs b/datafusion/execution/src/tree_node.rs similarity index 94% rename from datafusion/core/src/physical_plan/tree_node.rs rename to datafusion/execution/src/tree_node.rs index fad6508fdabe..513b197db91a 100644 --- a/datafusion/core/src/physical_plan/tree_node.rs +++ b/datafusion/execution/src/tree_node.rs @@ -17,7 +17,7 @@ //! This module provides common traits for visiting or rewriting tree nodes easily. -use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; +use crate::plan::{with_new_children_if_necessary, ExecutionPlan}; use datafusion_common::tree_node::{DynTreeNode, Transformed}; use datafusion_common::Result; use std::sync::Arc; diff --git a/datafusion/physical-expr/src/distribution.rs b/datafusion/physical-expr/src/distribution.rs new file mode 100644 index 000000000000..7a1ea778515f --- /dev/null +++ b/datafusion/physical-expr/src/distribution.rs @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::{ + expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, + EquivalenceProperties, PhysicalExpr, +}; + +/// Distribution schemes +#[derive(Debug, Clone)] +pub enum Distribution { + /// Unspecified distribution + UnspecifiedDistribution, + /// A single partition is required + SinglePartition, + /// Requires children to be distributed in such a way that the same + /// values of the keys end up in the same partition + HashPartitioned(Vec>), +} + +impl Distribution { + /// Creates a Partitioning for this Distribution to satisfy itself + pub fn create_partitioning(&self, partition_count: usize) -> Partitioning { + match self { + Distribution::UnspecifiedDistribution => { + Partitioning::UnknownPartitioning(partition_count) + } + Distribution::SinglePartition => Partitioning::UnknownPartitioning(1), + Distribution::HashPartitioned(expr) => { + Partitioning::Hash(expr.clone(), partition_count) + } + } + } +} + +/// Partitioning schemes supported by operators. +#[derive(Debug, Clone)] +pub enum Partitioning { + /// Allocate batches using a round-robin algorithm and the specified number of partitions + RoundRobinBatch(usize), + /// Allocate rows based on a hash of one of more expressions and the specified number of + /// partitions + Hash(Vec>, usize), + /// Unknown partitioning scheme with a known number of partitions + UnknownPartitioning(usize), +} + +impl Partitioning { + /// Returns the number of partitions in this partitioning scheme + pub fn partition_count(&self) -> usize { + use Partitioning::*; + match self { + RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n, + } + } + + /// Returns true when the guarantees made by this [[Partitioning]] are sufficient to + /// satisfy the partitioning scheme mandated by the `required` [[Distribution]] + pub fn satisfy EquivalenceProperties>( + &self, + required: Distribution, + equal_properties: F, + ) -> bool { + match required { + Distribution::UnspecifiedDistribution => true, + Distribution::SinglePartition if self.partition_count() == 1 => true, + Distribution::HashPartitioned(required_exprs) => { + match self { + // Here we do not check the partition count for hash partitioning and assumes the partition count + // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins, + // then we need to have the partition count and hash functions validation. + Partitioning::Hash(partition_exprs, _) => { + let fast_match = + expr_list_eq_strict_order(&required_exprs, partition_exprs); + // If the required exprs do not match, need to leverage the eq_properties provided by the child + // and normalize both exprs based on the eq_properties + if !fast_match { + let eq_properties = equal_properties(); + let eq_classes = eq_properties.classes(); + if !eq_classes.is_empty() { + let normalized_required_exprs = required_exprs + .iter() + .map(|e| { + normalize_expr_with_equivalence_properties( + e.clone(), + eq_classes, + ) + }) + .collect::>(); + let normalized_partition_exprs = partition_exprs + .iter() + .map(|e| { + normalize_expr_with_equivalence_properties( + e.clone(), + eq_classes, + ) + }) + .collect::>(); + expr_list_eq_strict_order( + &normalized_required_exprs, + &normalized_partition_exprs, + ) + } else { + fast_match + } + } else { + fast_match + } + } + _ => false, + } + } + _ => false, + } + } +} + +impl PartialEq for Partitioning { + fn eq(&self, other: &Partitioning) -> bool { + match (self, other) { + ( + Partitioning::RoundRobinBatch(count1), + Partitioning::RoundRobinBatch(count2), + ) if count1 == count2 => true, + (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2)) + if expr_list_eq_strict_order(exprs1, exprs2) && (count1 == count2) => + { + true + } + _ => false, + } + } +} diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index b54bcda601c7..213358dd7894 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -21,6 +21,7 @@ pub mod conditional_expressions; #[cfg(feature = "crypto_expressions")] pub mod crypto_expressions; pub mod datetime_expressions; +mod distribution; pub mod equivalence; pub mod execution_props; pub mod expressions; @@ -45,9 +46,9 @@ pub mod utils; pub mod var_provider; pub mod window; -// reexport this to maintain compatibility with anything that used from_slice previously pub use aggregate::AggregateExpr; pub use datafusion_common::from_slice; +pub use distribution::{Distribution, Partitioning}; pub use equivalence::{ project_equivalence_properties, project_ordering_equivalence_properties, EquivalenceProperties, EquivalentClass, OrderedColumn, OrderingEquivalenceProperties,