Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Move Partitioning andDistribution to physical_expr #7238

Merged
merged 3 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ use crate::datasource::physical_plan::{
use crate::error::Result;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
};
use arrow_schema::SchemaRef;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
LexOrdering, OrderingEquivalenceProperties, PhysicalSortExpr,
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
PhysicalSortExpr,
};
use futures::StreamExt;
use object_store::{GetResult, ObjectStore};
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ use crate::error::Result;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
};
use datafusion_execution::TaskContext;

use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
};

use std::any::Any;
use std::sync::Arc;
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ use crate::error::{DataFusionError, Result};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
};
use arrow::csv;
use arrow::datatypes::SchemaRef;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
};
use tokio::io::AsyncWriteExt;

use super::FileScanConfig;
Expand Down Expand Up @@ -438,7 +440,7 @@ impl FileOpener for CsvOpener {
/// Consider the following partitions enclosed by braces `{}`:
///
/// {A,1,2,3,4,5,6,7,8,9\n
/// A,1,2,3,4,5,6,7,8,9\n}
/// A,1,2,3,4,5,6,7,8,9\n}
/// A,1,2,3,4,5,6,7,8,9\n
/// The lines read would be: [0, 1]
///
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ use crate::error::{DataFusionError, Result};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
};
use datafusion_execution::TaskContext;

use arrow::json::ReaderBuilder;
use arrow::{datatypes::SchemaRef, json};
use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
};

use bytes::{Buf, Bytes};
use futures::{ready, stream, StreamExt, TryStreamExt};
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ use crate::{
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
},
};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::{
ordering_equivalence_properties_helper, PhysicalSortExpr,
};
use fmt::Debug;
use object_store::path::Path;
use std::any::Any;
Expand Down
174 changes: 4 additions & 170 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties;
pub use display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
use futures::stream::TryStreamExt;
use std::fmt;
use std::fmt::Debug;
use tokio::task::JoinSet;

Expand Down Expand Up @@ -373,177 +372,10 @@ pub fn execute_stream_partitioned(
Ok(streams)
}

/// Partitioning schemes supported by operators.
#[derive(Debug, Clone)]
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified number of partitions
RoundRobinBatch(usize),
/// Allocate rows based on a hash of one of more expressions and the specified number of
/// partitions
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
/// Unknown partitioning scheme with a known number of partitions
UnknownPartitioning(usize),
}

impl fmt::Display for Partitioning {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Partitioning::RoundRobinBatch(size) => write!(f, "RoundRobinBatch({size})"),
Partitioning::Hash(phy_exprs, size) => {
let phy_exprs_str = phy_exprs
.iter()
.map(|e| format!("{e}"))
.collect::<Vec<String>>()
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
impl Partitioning {
/// Returns the number of partitions in this partitioning scheme
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
}
}

/// Returns true when the guarantees made by this [[Partitioning]] are sufficient to
/// satisfy the partitioning scheme mandated by the `required` [[Distribution]]
pub fn satisfy<F: FnOnce() -> EquivalenceProperties>(
&self,
required: Distribution,
equal_properties: F,
) -> bool {
match required {
Distribution::UnspecifiedDistribution => true,
Distribution::SinglePartition if self.partition_count() == 1 => true,
Distribution::HashPartitioned(required_exprs) => {
match self {
// Here we do not check the partition count for hash partitioning and assumes the partition count
// and hash functions in the system are the same. In future if we plan to support storage partition-wise joins,
// then we need to have the partition count and hash functions validation.
Partitioning::Hash(partition_exprs, _) => {
let fast_match =
expr_list_eq_strict_order(&required_exprs, partition_exprs);
// If the required exprs do not match, need to leverage the eq_properties provided by the child
// and normalize both exprs based on the eq_properties
if !fast_match {
let eq_properties = equal_properties();
let eq_classes = eq_properties.classes();
if !eq_classes.is_empty() {
let normalized_required_exprs = required_exprs
.iter()
.map(|e| {
normalize_expr_with_equivalence_properties(
e.clone(),
eq_classes,
)
})
.collect::<Vec<_>>();
let normalized_partition_exprs = partition_exprs
.iter()
.map(|e| {
normalize_expr_with_equivalence_properties(
e.clone(),
eq_classes,
)
})
.collect::<Vec<_>>();
expr_list_eq_strict_order(
&normalized_required_exprs,
&normalized_partition_exprs,
)
} else {
fast_match
}
} else {
fast_match
}
}
_ => false,
}
}
_ => false,
}
}
}

impl PartialEq for Partitioning {
fn eq(&self, other: &Partitioning) -> bool {
match (self, other) {
(
Partitioning::RoundRobinBatch(count1),
Partitioning::RoundRobinBatch(count2),
) if count1 == count2 => true,
(Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
if expr_list_eq_strict_order(exprs1, exprs2) && (count1 == count2) =>
{
true
}
_ => false,
}
}
}

/// Retrieves the ordering equivalence properties for a given schema and output ordering.
pub fn ordering_equivalence_properties_helper(
schema: SchemaRef,
eq_orderings: &[LexOrdering],
) -> OrderingEquivalenceProperties {
let mut oep = OrderingEquivalenceProperties::new(schema);
let first_ordering = if let Some(first) = eq_orderings.first() {
first
} else {
// Return an empty OrderingEquivalenceProperties:
return oep;
};
// First entry among eq_orderings is the head, skip it:
for ordering in eq_orderings.iter().skip(1) {
if !ordering.is_empty() {
oep.add_equal_conditions((first_ordering, ordering))
}
}
oep
}

/// Distribution schemes
#[derive(Debug, Clone)]
pub enum Distribution {
/// Unspecified distribution
UnspecifiedDistribution,
/// A single partition is required
SinglePartition,
/// Requires children to be distributed in such a way that the same
/// values of the keys end up in the same partition
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}

impl Distribution {
/// Creates a Partitioning for this Distribution to satisfy itself
pub fn create_partitioning(&self, partition_count: usize) -> Partitioning {
match self {
Distribution::UnspecifiedDistribution => {
Partitioning::UnknownPartitioning(partition_count)
}
Distribution::SinglePartition => Partitioning::UnknownPartitioning(1),
Distribution::HashPartitioned(expr) => {
Partitioning::Hash(expr.clone(), partition_count)
}
}
}
}

use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, LexOrdering,
};
pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
pub use datafusion_physical_expr::{Distribution, Partitioning};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};

pub mod aggregates;
Expand Down Expand Up @@ -576,7 +408,9 @@ use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
pub use datafusion_common::utils::project_schema;
use datafusion_execution::TaskContext;
pub use datafusion_physical_expr::{expressions, functions, hash_utils, udf};
pub use datafusion_physical_expr::{
expressions, functions, hash_utils, ordering_equivalence_properties_helper, udf,
};

#[cfg(test)]
mod tests {
Expand Down
21 changes: 21 additions & 0 deletions datafusion/physical-expr/src/equivalence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,27 @@ pub fn update_ordering_equivalence_with_cast(
}
}

/// Retrieves the ordering equivalence properties for a given schema and output ordering.
pub fn ordering_equivalence_properties_helper(
schema: SchemaRef,
eq_orderings: &[LexOrdering],
) -> OrderingEquivalenceProperties {
let mut oep = OrderingEquivalenceProperties::new(schema);
let first_ordering = if let Some(first) = eq_orderings.first() {
first
} else {
// Return an empty OrderingEquivalenceProperties:
return oep;
};
// First entry among eq_orderings is the head, skip it:
for ordering in eq_orderings.iter().skip(1) {
if !ordering.is_empty() {
oep.add_equal_conditions((first_ordering, ordering))
}
}
oep
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 5 additions & 3 deletions datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod functions;
pub mod hash_utils;
pub mod intervals;
pub mod math_expressions;
mod partitioning;
mod physical_expr;
pub mod planner;
#[cfg(feature = "regex_expressions")]
Expand All @@ -54,10 +55,11 @@ pub use aggregate::AggregateExpr;
pub use analysis::{analyze, AnalysisContext, ExprBoundaries};

pub use equivalence::{
project_equivalence_properties, project_ordering_equivalence_properties,
EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,
OrderingEquivalentClass,
ordering_equivalence_properties_helper, project_equivalence_properties,
project_ordering_equivalence_properties, EquivalenceProperties, EquivalentClass,
OrderingEquivalenceProperties, OrderingEquivalentClass,
};
pub use partitioning::{Distribution, Partitioning};
pub use physical_expr::{PhysicalExpr, PhysicalExprRef};
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
Expand Down
Loading