Skip to content

Commit d633a19

Browse files
committed
Add display implementation for equivalence properties, sort, etc
1 parent abe68ed commit d633a19

File tree

5 files changed

+227
-9
lines changed

5 files changed

+227
-9
lines changed

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,22 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
223223
any
224224
}
225225
}
226+
227+
/// Writes a list of [`PhysicalExpr`]s to a `std::fmt::Formatter`.
228+
///
229+
/// Example output: `[a + 1, b]`
230+
pub fn format_physical_expr_list(
231+
f: &mut std::fmt::Formatter<'_>,
232+
ordering: &[Arc<dyn PhysicalExpr>],
233+
) -> std::fmt::Result {
234+
let mut iter = ordering.iter();
235+
write!(f, "[")?;
236+
if let Some(expr) = iter.next() {
237+
write!(f, "{}", expr)?;
238+
}
239+
for expr in iter {
240+
write!(f, ", {}", expr)?;
241+
}
242+
write!(f, "]")?;
243+
Ok(())
244+
}

datafusion/physical-expr-common/src/sort_expr.rs

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,51 @@ use datafusion_common::Result;
3030
use datafusion_expr_common::columnar_value::ColumnarValue;
3131

3232
/// Represents Sort operation for a column in a RecordBatch
33+
///
34+
/// Example:
35+
/// ```no_run
36+
/// # use std::any::Any;
37+
/// # use std::fmt::Display;
38+
/// # use std::hash::Hasher;
39+
/// # use std::sync::Arc;
40+
/// # use arrow::array::RecordBatch;
41+
/// # use datafusion_common::Result;
42+
/// # use arrow::compute::SortOptions;
43+
/// # use arrow::datatypes::{DataType, Schema};
44+
/// # use datafusion_expr_common::columnar_value::ColumnarValue;
45+
/// # use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
46+
/// # use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
47+
/// # // this crate doesn't have a physical expression implementation
48+
/// # // so make a really simple one
49+
/// # #[derive(Clone, Debug, PartialEq, Eq, Hash)]
50+
/// # struct MyPhysicalExpr;
51+
/// # impl PhysicalExpr for MyPhysicalExpr {
52+
/// # fn as_any(&self) -> &dyn Any {todo!() }
53+
/// # fn data_type(&self, input_schema: &Schema) -> Result<DataType> {todo!()}
54+
/// # fn nullable(&self, input_schema: &Schema) -> Result<bool> {todo!() }
55+
/// # fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {todo!() }
56+
/// # fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {todo!()}
57+
/// # fn with_new_children(self: Arc<Self>, children: Vec<Arc<dyn PhysicalExpr>>) -> Result<Arc<dyn PhysicalExpr>> {todo!()}
58+
/// # fn dyn_hash(&self, _state: &mut dyn Hasher) {todo!()}
59+
/// # }
60+
/// # impl Display for MyPhysicalExpr {
61+
/// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "a") }
62+
/// # }
63+
/// # impl PartialEq<dyn Any> for MyPhysicalExpr {
64+
/// # fn eq(&self, _other: &dyn Any) -> bool { true }
65+
/// # }
66+
/// # fn col(name: &str) -> Arc<dyn PhysicalExpr> { Arc::new(MyPhysicalExpr) }
67+
/// // Sort by a ASC
68+
/// let options = SortOptions::default();
69+
/// let sort_expr = PhysicalSortExpr::new(col("a"), options);
70+
/// assert_eq!(sort_expr.to_string(), "a ASC NULLS LAST");
71+
///
72+
/// // Sort by a DESC NULLS FIRST
73+
/// let sort_expr = PhysicalSortExpr::new_default(col("a"))
74+
/// .desc()
75+
/// .nulls_first();
76+
/// assert_eq!(sort_expr.to_string(), "a DESC NULLS FIRST");
77+
/// ```
3378
#[derive(Clone, Debug)]
3479
pub struct PhysicalSortExpr {
3580
/// Physical expression representing the column to sort
@@ -43,6 +88,35 @@ impl PhysicalSortExpr {
4388
pub fn new(expr: Arc<dyn PhysicalExpr>, options: SortOptions) -> Self {
4489
Self { expr, options }
4590
}
91+
92+
/// Create a new PhysicalSortExpr with default [`SortOptions`]
93+
pub fn new_default(expr: Arc<dyn PhysicalExpr>) -> Self {
94+
Self::new(expr, SortOptions::default())
95+
}
96+
97+
/// Set the sort sort options to ASC
98+
pub fn asc(mut self) -> Self {
99+
self.options.descending = false;
100+
self
101+
}
102+
103+
/// Set the sort sort options to DESC
104+
pub fn desc(mut self) -> Self {
105+
self.options.descending = true;
106+
self
107+
}
108+
109+
/// Set the sort sort options to NULLS FIRST
110+
pub fn nulls_first(mut self) -> Self {
111+
self.options.nulls_first = true;
112+
self
113+
}
114+
115+
/// Set the sort sort options to NULLS LAST
116+
pub fn nulls_last(mut self) -> Self {
117+
self.options.nulls_first = false;
118+
self
119+
}
46120
}
47121

48122
impl PartialEq for PhysicalSortExpr {
@@ -60,7 +134,7 @@ impl Hash for PhysicalSortExpr {
60134
}
61135
}
62136

63-
impl std::fmt::Display for PhysicalSortExpr {
137+
impl Display for PhysicalSortExpr {
64138
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
65139
write!(f, "{} {}", self.expr, to_str(&self.options))
66140
}

datafusion/physical-expr/src/equivalence/class.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::fmt::Display;
1819
use std::sync::Arc;
1920

2021
use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
@@ -27,6 +28,7 @@ use crate::{
2728

2829
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
2930
use datafusion_common::JoinType;
31+
use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
3032

3133
#[derive(Debug, Clone)]
3234
/// A structure representing a expression known to be constant in a physical execution plan.
@@ -101,6 +103,19 @@ impl ConstExpr {
101103
}
102104
}
103105

106+
/// Display implementation for `ConstExpr`
107+
///
108+
/// Example `c` or `c(across_partitions)`
109+
impl Display for ConstExpr {
110+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
111+
write!(f, "{}", self.expr)?;
112+
if self.across_partitions {
113+
write!(f, "(across_partitions)")?;
114+
}
115+
Ok(())
116+
}
117+
}
118+
104119
impl From<Arc<dyn PhysicalExpr>> for ConstExpr {
105120
fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
106121
Self::new(expr)
@@ -224,6 +239,14 @@ impl EquivalenceClass {
224239
}
225240
}
226241

242+
impl Display for EquivalenceClass {
243+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
244+
write!(f, "[")?;
245+
format_physical_expr_list(f, &self.exprs)?;
246+
write!(f, "]")
247+
}
248+
}
249+
227250
/// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each
228251
/// class represents a distinct equivalence class in a relation.
229252
#[derive(Debug, Clone)]
@@ -575,6 +598,20 @@ impl EquivalenceGroup {
575598
}
576599
}
577600

601+
impl Display for EquivalenceGroup {
602+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
603+
write!(f, "[")?;
604+
let mut iter = self.iter();
605+
if let Some(cls) = iter.next() {
606+
write!(f, "{}", cls)?;
607+
}
608+
for cls in iter {
609+
write!(f, ", {}", cls)?;
610+
}
611+
write!(f, "]")
612+
}
613+
}
614+
578615
#[cfg(test)]
579616
mod tests {
580617

datafusion/physical-expr/src/equivalence/ordering.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::fmt::Display;
1819
use std::hash::Hash;
1920
use std::sync::Arc;
2021

21-
use arrow_schema::SortOptions;
22-
2322
use crate::equivalence::add_offset_to_expr;
2423
use crate::{LexOrdering, PhysicalExpr, PhysicalSortExpr};
24+
use arrow_schema::SortOptions;
2525

2626
/// An `OrderingEquivalenceClass` object keeps track of different alternative
2727
/// orderings than can describe a schema. For example, consider the following table:
@@ -219,6 +219,21 @@ fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) ->
219219
false
220220
}
221221

222+
impl Display for OrderingEquivalenceClass {
223+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224+
write!(f, "[")?;
225+
let mut iter = self.orderings.iter();
226+
if let Some(ordering) = iter.next() {
227+
write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
228+
}
229+
for ordering in iter {
230+
write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
231+
}
232+
write!(f, "]")?;
233+
Ok(())
234+
}
235+
}
236+
222237
#[cfg(test)]
223238
mod tests {
224239
use std::sync::Arc;

datafusion/physical-expr/src/equivalence/properties.rs

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::fmt::Display;
1819
use std::hash::{Hash, Hasher};
1920
use std::sync::Arc;
2021

@@ -71,7 +72,7 @@ use itertools::Itertools;
7172
/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be
7273
/// avoided.
7374
///
74-
/// # Example equivalent equivalent expressions
75+
/// # Example equivalent expressions
7576
///
7677
/// Similarly, consider the table below:
7778
///
@@ -92,6 +93,32 @@ use itertools::Itertools;
9293
/// `Hash(a)` and output partitioning is `Hash(b)`, then DataFusion avoids
9394
/// repartitioning the data as the existing partitioning satisfies the
9495
/// requirement.
96+
///
97+
/// # Code Example
98+
/// ```
99+
/// use std::sync::Arc;
100+
/// use arrow_schema::{Schema, Field, DataType, SchemaRef};
101+
/// use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
102+
/// use datafusion_physical_expr::expressions::col;
103+
///
104+
/// let schema: SchemaRef = Arc::new(Schema::new(vec![
105+
/// Field::new("a", DataType::Int32, false),
106+
/// Field::new("b", DataType::Int32, false),
107+
/// Field::new("c", DataType::Int32, false),
108+
/// ]));
109+
///
110+
/// let col_a = col("a", &schema).unwrap();
111+
/// let col_b = col("b", &schema).unwrap();
112+
/// let col_c = col("c", &schema).unwrap();
113+
///
114+
/// // This object represents data that is sorted by a ASC, c DESC
115+
/// // with a single constant value of b
116+
/// let mut eq_properties = EquivalenceProperties::new(schema);
117+
/// eq_properties.add_constants(vec![ConstExpr::from(col_b)]);
118+
/// eq_properties.add_new_ordering(vec![col_a.asc(), col_c.desc()]);
119+
///
120+
/// assert_eq!(eq_properties.to_string(), "ff")
121+
/// ```
95122
#[derive(Debug, Clone)]
96123
pub struct EquivalenceProperties {
97124
/// Collection of equivalence classes that store expressions with the same
@@ -203,6 +230,11 @@ impl EquivalenceProperties {
203230
self.oeq_class.add_new_orderings(orderings);
204231
}
205232

233+
/// Adds a single ordering to the existing ordering equivalence class.
234+
pub fn add_new_ordering(&mut self, ordering: LexOrdering) {
235+
self.add_new_orderings([ordering]);
236+
}
237+
206238
/// Incorporates the given equivalence group to into the existing
207239
/// equivalence group within.
208240
pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
@@ -1059,6 +1091,37 @@ impl EquivalenceProperties {
10591091
}
10601092
}
10611093

1094+
/// More readable display version of the `EquivalenceProperties`.
1095+
///
1096+
/// Format:
1097+
/// ```text
1098+
/// order: [[a ASC, b ASC], [a ASC, c ASC]], eq: [[a = b], [a = c]], const: [a = 1]
1099+
/// ```
1100+
impl Display for EquivalenceProperties {
1101+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1102+
if self.eq_group.is_empty()
1103+
&& self.oeq_class.is_empty()
1104+
&& self.constants.is_empty()
1105+
{
1106+
return write!(f, "No properties");
1107+
}
1108+
if !self.oeq_class.is_empty() {
1109+
write!(f, "order: {}", self.oeq_class)?;
1110+
}
1111+
if !self.eq_group.is_empty() {
1112+
write!(f, ", eq: {}", self.eq_group)?;
1113+
}
1114+
if !self.constants.is_empty() {
1115+
write!(f, ", const: [")?;
1116+
for c in &self.constants {
1117+
write!(f, ", {}", c)?;
1118+
}
1119+
write!(f, "]")?;
1120+
}
1121+
Ok(())
1122+
}
1123+
}
1124+
10621125
/// Calculates the properties of a given [`ExprPropertiesNode`].
10631126
///
10641127
/// Order information can be retrieved as:
@@ -3032,21 +3095,31 @@ mod tests {
30323095
// Check whether constants are same
30333096
let lhs_constants = lhs.constants();
30343097
let rhs_constants = rhs.constants();
3035-
assert_eq!(lhs_constants.len(), rhs_constants.len(), "{}", err_msg);
3098+
assert_eq!(
3099+
lhs_constants.len(),
3100+
rhs_constants.len(),
3101+
"{err_msg}\nlhs: {lhs}\nrhs: {rhs}"
3102+
);
30363103
for rhs_constant in rhs_constants {
30373104
assert!(
30383105
const_exprs_contains(lhs_constants, rhs_constant.expr()),
3039-
"{}",
3040-
err_msg
3106+
"{err_msg}\nlhs: {lhs}\nrhs: {rhs}"
30413107
);
30423108
}
30433109

30443110
// Check whether orderings are same.
30453111
let lhs_orderings = lhs.oeq_class();
30463112
let rhs_orderings = &rhs.oeq_class.orderings;
3047-
assert_eq!(lhs_orderings.len(), rhs_orderings.len(), "{}", err_msg);
3113+
assert_eq!(
3114+
lhs_orderings.len(),
3115+
rhs_orderings.len(),
3116+
"{err_msg}\nlhs: {lhs}\nrhs: {rhs}"
3117+
);
30483118
for rhs_ordering in rhs_orderings {
3049-
assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg);
3119+
assert!(
3120+
lhs_orderings.contains(rhs_ordering),
3121+
"{err_msg}\nlhs: {lhs}\nrhs: {rhs}"
3122+
);
30503123
}
30513124
}
30523125
}

0 commit comments

Comments
 (0)