Skip to content

Commit 7a063d8

Browse files
authored
Support repartitioned() method in RepartitionExec (#17990)
* Support repartitioned() method in RepartitionExec * Add tests
1 parent dd68592 commit 7a063d8

File tree

1 file changed

+44
-5
lines changed
  • datafusion/physical-plan/src/repartition

1 file changed

+44
-5
lines changed

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,27 @@ impl ExecutionPlan for RepartitionExec {
858858
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
859859
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
860860
}
861+
862+
fn repartitioned(
863+
&self,
864+
target_partitions: usize,
865+
_config: &ConfigOptions,
866+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
867+
use Partitioning::*;
868+
let mut new_properties = self.cache.clone();
869+
new_properties.partitioning = match new_properties.partitioning {
870+
RoundRobinBatch(_) => RoundRobinBatch(target_partitions),
871+
Hash(hash, _) => Hash(hash, target_partitions),
872+
UnknownPartitioning(_) => UnknownPartitioning(target_partitions),
873+
};
874+
Ok(Some(Arc::new(Self {
875+
input: Arc::clone(&self.input),
876+
state: Arc::clone(&self.state),
877+
metrics: self.metrics.clone(),
878+
preserve_order: self.preserve_order,
879+
cache: new_properties,
880+
})))
881+
}
861882
}
862883

863884
impl RepartitionExec {
@@ -1762,8 +1783,7 @@ mod test {
17621783
///
17631784
macro_rules! assert_plan {
17641785
($EXPECTED_PLAN_LINES: expr, $PLAN: expr) => {
1765-
let physical_plan = $PLAN;
1766-
let formatted = crate::displayable(&physical_plan).indent(true).to_string();
1786+
let formatted = crate::displayable($PLAN).indent(true).to_string();
17671787
let actual: Vec<&str> = formatted.trim().lines().collect();
17681788

17691789
let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES
@@ -1794,7 +1814,7 @@ mod test {
17941814
" DataSourceExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC",
17951815
" DataSourceExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC",
17961816
];
1797-
assert_plan!(expected_plan, exec);
1817+
assert_plan!(expected_plan, &exec);
17981818
Ok(())
17991819
}
18001820

@@ -1813,7 +1833,7 @@ mod test {
18131833
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
18141834
" DataSourceExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC",
18151835
];
1816-
assert_plan!(expected_plan, exec);
1836+
assert_plan!(expected_plan, &exec);
18171837
Ok(())
18181838
}
18191839

@@ -1834,7 +1854,26 @@ mod test {
18341854
" DataSourceExec: partitions=1, partition_sizes=[0]",
18351855
" DataSourceExec: partitions=1, partition_sizes=[0]",
18361856
];
1837-
assert_plan!(expected_plan, exec);
1857+
assert_plan!(expected_plan, &exec);
1858+
Ok(())
1859+
}
1860+
1861+
#[tokio::test]
1862+
async fn test_repartition() -> Result<()> {
1863+
let schema = test_schema();
1864+
let sort_exprs = sort_exprs(&schema);
1865+
let source = sorted_memory_exec(&schema, sort_exprs);
1866+
// output is sorted, but has only a single partition, so no need to sort
1867+
let exec = RepartitionExec::try_new(source, Partitioning::RoundRobinBatch(10))?
1868+
.repartitioned(20, &Default::default())?
1869+
.unwrap();
1870+
1871+
// Repartition should not preserve order
1872+
let expected_plan = [
1873+
"RepartitionExec: partitioning=RoundRobinBatch(20), input_partitions=1",
1874+
" DataSourceExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC",
1875+
];
1876+
assert_plan!(expected_plan, exec.as_ref());
18381877
Ok(())
18391878
}
18401879

0 commit comments

Comments
 (0)