Skip to content

Commit

Permalink
Test SortPreservingMerge with different RecordBatch sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 26, 2021
1 parent 270ca62 commit 29c767b
Showing 1 changed file with 6 additions and 12 deletions.
18 changes: 6 additions & 12 deletions datafusion/src/physical_plan/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ mod tests {

async fn sorted_partitioned_input(
sort: Vec<PhysicalSortExpr>,
sizes: &[usize],
) -> Arc<dyn ExecutionPlan> {
let schema = test::aggr_test_schema();
let partitions = 4;
Expand All @@ -735,16 +736,9 @@ mod tests {
);

let sorted = basic_sort(csv, sort).await;
let split = split_batch(&sorted, 10);
let split: Vec<_> = sizes.iter().map(|x| split_batch(&sorted, *x)).collect();

Arc::new(
MemoryExec::try_new(
&[split.clone(), split.clone(), split],
sorted.schema(),
None,
)
.unwrap(),
)
Arc::new(MemoryExec::try_new(&split, sorted.schema(), None).unwrap())
}

#[tokio::test]
Expand Down Expand Up @@ -772,7 +766,7 @@ mod tests {
},
];

let input = sorted_partitioned_input(sort.clone()).await;
let input = sorted_partitioned_input(sort.clone(), &[10, 3, 11]).await;
let basic = basic_sort(input.clone(), sort.clone()).await;
let partition = sorted_merge(input, sort).await;

Expand Down Expand Up @@ -800,7 +794,7 @@ mod tests {
},
];

let input = sorted_partitioned_input(sort.clone()).await;
let input = sorted_partitioned_input(sort.clone(), &[10, 5, 13]).await;
let basic = basic_sort(input.clone(), sort.clone()).await;

let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 23));
Expand Down Expand Up @@ -908,7 +902,7 @@ mod tests {
options: SortOptions::default(),
}];

let batches = sorted_partitioned_input(sort.clone()).await;
let batches = sorted_partitioned_input(sort.clone(), &[5, 7, 3]).await;

let partition_count = batches.output_partitioning().partition_count();
let mut tasks = Vec::with_capacity(partition_count);
Expand Down

0 comments on commit 29c767b

Please sign in to comment.