Skip to content

Commit

Permalink
Use fetch limit in get_sorted_iter (#3545)
Browse files Browse the repository at this point in the history
* Add fetch, fix length

* Add fetch, fix length

* Simplify implementation a bit

* Simplify

* Doc

* Reorder

* Move parallel sort to planner

* Simplify a bit more

* Update datafusion/core/src/physical_plan/planner.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
Dandandan and alamb authored Sep 21, 2022
1 parent 0a2b0a7 commit ff718d0
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 135 deletions.
3 changes: 0 additions & 3 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::{
physical_optimizer::{
aggregate_statistics::AggregateStatistics,
hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
parallel_sort::ParallelSort,
},
};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -1472,8 +1471,6 @@ impl SessionState {
.unwrap(),
)));
}
physical_optimizers.push(Arc::new(ParallelSort::new()));

physical_optimizers.push(Arc::new(Repartition::new()));
physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));

Expand Down
93 changes: 0 additions & 93 deletions datafusion/core/src/physical_optimizer/parallel_sort.rs

This file was deleted.

19 changes: 17 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Physical query planner
use super::analyze::AnalyzeExec;
use super::sorts::sort_preserving_merge::SortPreservingMergeExec;
use super::{
aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec,
values::ValuesExec, windows,
Expand Down Expand Up @@ -841,8 +842,22 @@ impl DefaultPhysicalPlanner {
)),
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?))
}
// If we have a `LIMIT` can run sort/limts in parallel (similar to TopK)
Ok(if fetch.is_some() && session_state.config.target_partitions > 1 {
let sort = SortExec::new_with_partitioning(
sort_expr,
physical_input,
true,
*fetch,
);
let merge = SortPreservingMergeExec::new(
sort.expr().to_vec(),
Arc::new(sort),
);
Arc::new(merge)
} else {
Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?)
}) }
LogicalPlan::Join(Join {
left,
right,
Expand Down
65 changes: 28 additions & 37 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::physical_plan::{
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::prelude::SessionConfig;
use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array};
use arrow::array::{make_array, Array, ArrayRef, MutableArrayData};
pub use arrow::compute::SortOptions;
use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions};
use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -152,6 +152,7 @@ impl ExternalSorter {
&self.expr,
batch_size,
tracking_metrics,
self.fetch,
)?;
let prev_used = self.free_all_memory();
streams.push(SortedStream::new(in_mem_stream, prev_used));
Expand Down Expand Up @@ -183,6 +184,7 @@ impl ExternalSorter {
&self.expr,
batch_size,
tracking_metrics,
self.fetch,
);
// Report to the memory manager we are no longer using memory
self.free_all_memory();
Expand Down Expand Up @@ -273,6 +275,7 @@ impl MemoryConsumer for ExternalSorter {
&self.expr,
self.session_config.batch_size(),
tracking_metrics,
self.fetch,
);

spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone())
Expand All @@ -289,13 +292,14 @@ impl MemoryConsumer for ExternalSorter {
}
}

/// consume the non-empty `sorted_bathes` and do in_mem_sort
/// consume the non-empty `sorted_batches` and do in_mem_sort
fn in_mem_partial_sort(
buffered_batches: &mut Vec<BatchWithSortArray>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
batch_size: usize,
tracking_metrics: MemTrackingMetrics,
fetch: Option<usize>,
) -> Result<SendableRecordBatchStream> {
assert_ne!(buffered_batches.len(), 0);
if buffered_batches.len() == 1 {
Expand Down Expand Up @@ -323,7 +327,7 @@ fn in_mem_partial_sort(
// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let _timer = tracking_metrics.elapsed_compute().timer();
get_sorted_iter(&sorted_arrays, expressions, batch_size)?
get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)?
};
Ok(Box::pin(SortedSizedRecordBatchStream::new(
schema,
Expand All @@ -345,6 +349,7 @@ fn get_sorted_iter(
sort_arrays: &[Vec<ArrayRef>],
expr: &[PhysicalSortExpr],
batch_size: usize,
fetch: Option<usize>,
) -> Result<SortedIterator> {
let row_indices = sort_arrays
.iter()
Expand Down Expand Up @@ -374,44 +379,38 @@ fn get_sorted_iter(
})
})
.collect::<Result<Vec<_>>>()?;
let indices = lexsort_to_indices(&sort_columns, None)?;
let indices = lexsort_to_indices(&sort_columns, fetch)?;

Ok(SortedIterator::new(indices, row_indices, batch_size))
// Calculate composite index based on sorted indices
let row_indices = indices
.values()
.iter()
.map(|i| row_indices[*i as usize])
.collect();

Ok(SortedIterator::new(row_indices, batch_size))
}

struct SortedIterator {
/// Current logical position in the iterator
pos: usize,
/// Indexes into the input representing the correctly sorted total output
indices: UInt32Array,
/// Map each each logical input index to where it can be found in the sorted input batches
/// Sorted composite index of where to find the rows in buffered batches
composite: Vec<CompositeIndex>,
/// Maximum batch size to produce
batch_size: usize,
/// total length of the iterator
length: usize,
}

impl SortedIterator {
fn new(
indices: UInt32Array,
composite: Vec<CompositeIndex>,
batch_size: usize,
) -> Self {
let length = composite.len();
fn new(composite: Vec<CompositeIndex>, batch_size: usize) -> Self {
Self {
pos: 0,
indices,
composite,
batch_size,
length,
}
}

fn memory_size(&self) -> usize {
std::mem::size_of_val(self)
+ self.indices.get_array_memory_size()
+ std::mem::size_of_val(&self.composite[..])
std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..])
}
}

Expand All @@ -420,33 +419,25 @@ impl Iterator for SortedIterator {

/// Emit a max of `batch_size` positions each time
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.length {
let length = self.composite.len();
if self.pos >= length {
return None;
}

let current_size = min(self.batch_size, self.length - self.pos);
let current_size = min(self.batch_size, length - self.pos);

// Combine adjacent indexes from the same batch to make a slice,
// for more efficient `extend` later.
let mut last_batch_idx = 0;
let mut indices_in_batch = vec![];
let mut last_batch_idx = self.composite[self.pos].batch_idx;
let mut indices_in_batch = Vec::with_capacity(current_size);

let mut slices = vec![];
for i in 0..current_size {
let p = self.pos + i;
let c_index = self.indices.value(p) as usize;
let ci = self.composite[c_index];

if indices_in_batch.is_empty() {
last_batch_idx = ci.batch_idx;
indices_in_batch.push(ci.row_idx);
} else if ci.batch_idx == last_batch_idx {
indices_in_batch.push(ci.row_idx);
} else {
for ci in &self.composite[self.pos..self.pos + current_size] {
if ci.batch_idx != last_batch_idx {
group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
last_batch_idx = ci.batch_idx;
indices_in_batch.push(ci.row_idx);
}
indices_in_batch.push(ci.row_idx);
}

assert!(
Expand Down

0 comments on commit ff718d0

Please sign in to comment.