Skip to content

Commit

Permalink
Make required_guarantees output to be deterministic (#12484)
Browse files Browse the repository at this point in the history
* Make output to be determinstic by adding sort

Signed-off-by: Austin Liu <austin362667@gmail.com>

Make output to be determinstic by adding sort

Signed-off-by: Austin Liu <austin362667@gmail.com>

Format

Signed-off-by: Austin Liu <austin362667@gmail.com>

* Add deterministic test for `required_guarantees`

Signed-off-by: Austin Liu <austin362667@gmail.com>

---------

Signed-off-by: Austin Liu <austin362667@gmail.com>
  • Loading branch information
austin362667 authored Sep 18, 2024
1 parent ce1091c commit c763fda
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 15 deletions.
119 changes: 114 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,14 +613,16 @@ impl DisplayAs for ParquetExec {
.pruning_predicate
.as_ref()
.map(|pre| {
let mut guarantees = pre
.literal_guarantees()
.iter()
.map(|item| format!("{}", item))
.collect_vec();
guarantees.sort();
format!(
", pruning_predicate={}, required_guarantees=[{}]",
pre.predicate_expr(),
pre.literal_guarantees()
.iter()
.map(|item| format!("{}", item))
.collect_vec()
.join(", ")
guarantees.join(", ")
)
})
.unwrap_or_default();
Expand Down Expand Up @@ -1808,6 +1810,26 @@ mod tests {
create_batch(vec![("c1", c1.clone())])
}

/// Returns a int64 array with contents:
/// "[-1, 1, null, 2, 3, null, null]"
fn int64_batch() -> RecordBatch {
let contents: ArrayRef = Arc::new(Int64Array::from(vec![
Some(-1),
Some(1),
None,
Some(2),
Some(3),
None,
None,
]));

create_batch(vec![
("a", contents.clone()),
("b", contents.clone()),
("c", contents.clone()),
])
}

#[tokio::test]
async fn parquet_exec_metrics() {
// batch1: c1(string)
Expand Down Expand Up @@ -1873,6 +1895,93 @@ mod tests {
assert_contains!(&display, "projection=[c1]");
}

#[tokio::test]
async fn parquet_exec_display_deterministic() {
// batches: a(int64), b(int64), c(int64)
let batches = int64_batch();

fn extract_required_guarantees(s: &str) -> Option<&str> {
s.split("required_guarantees=").nth(1)
}

// Ensuring that the required_guarantees remain consistent across every display plan of the filter conditions
for _ in 0..100 {
// c = 1 AND b = 1 AND a = 1
let filter0 = col("c")
.eq(lit(1))
.and(col("b").eq(lit(1)))
.and(col("a").eq(lit(1)));

let rt0 = RoundTrip::new()
.with_predicate(filter0)
.with_pushdown_predicate()
.round_trip(vec![batches.clone()])
.await;

let pruning_predicate = &rt0.parquet_exec.pruning_predicate;
assert!(pruning_predicate.is_some());

let display0 = displayable(rt0.parquet_exec.as_ref())
.indent(true)
.to_string();

let guarantees0: &str = extract_required_guarantees(&display0)
.expect("Failed to extract required_guarantees");
// Compare only the required_guarantees part (Because the file_groups part will not be the same)
assert_eq!(
guarantees0.trim(),
"[a in (1), b in (1), c in (1)]",
"required_guarantees don't match"
);
}

// c = 1 AND a = 1 AND b = 1
let filter1 = col("c")
.eq(lit(1))
.and(col("a").eq(lit(1)))
.and(col("b").eq(lit(1)));

let rt1 = RoundTrip::new()
.with_predicate(filter1)
.with_pushdown_predicate()
.round_trip(vec![batches.clone()])
.await;

// b = 1 AND a = 1 AND c = 1
let filter2 = col("b")
.eq(lit(1))
.and(col("a").eq(lit(1)))
.and(col("c").eq(lit(1)));

let rt2 = RoundTrip::new()
.with_predicate(filter2)
.with_pushdown_predicate()
.round_trip(vec![batches])
.await;

// should have a pruning predicate
let pruning_predicate = &rt1.parquet_exec.pruning_predicate;
assert!(pruning_predicate.is_some());
let pruning_predicate = &rt2.parquet_exec.pruning_predicate;
assert!(pruning_predicate.is_some());

// convert to explain plan form
let display1 = displayable(rt1.parquet_exec.as_ref())
.indent(true)
.to_string();
let display2 = displayable(rt2.parquet_exec.as_ref())
.indent(true)
.to_string();

let guarantees1 = extract_required_guarantees(&display1)
.expect("Failed to extract required_guarantees");
let guarantees2 = extract_required_guarantees(&display2)
.expect("Failed to extract required_guarantees");

// Compare only the required_guarantees part (Because the predicate part will not be the same)
assert_eq!(guarantees1, guarantees2, "required_guarantees don't match");
}

#[tokio::test]
async fn parquet_exec_has_no_pruning_predicate_if_can_not_prune() {
// batch1: c1(string)
Expand Down
15 changes: 5 additions & 10 deletions datafusion/physical-expr/src/utils/guarantee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,26 +225,21 @@ impl LiteralGuarantee {

impl Display for LiteralGuarantee {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let mut sorted_literals: Vec<_> =
self.literals.iter().map(|lit| lit.to_string()).collect();
sorted_literals.sort();
match self.guarantee {
Guarantee::In => write!(
f,
"{} in ({})",
self.column.name,
self.literals
.iter()
.map(|lit| lit.to_string())
.collect::<Vec<_>>()
.join(", ")
sorted_literals.join(", ")
),
Guarantee::NotIn => write!(
f,
"{} not in ({})",
self.column.name,
self.literals
.iter()
.map(|lit| lit.to_string())
.collect::<Vec<_>>()
.join(", ")
sorted_literals.join(", ")
),
}
}
Expand Down

0 comments on commit c763fda

Please sign in to comment.