Skip to content

Commit

Permalink
fix(7181): improve performance by using hasher on tuple (unqiue slice…
Browse files Browse the repository at this point in the history
…d record batch)
  • Loading branch information
wiedld committed Aug 29, 2023
1 parent 08347cf commit 76fee2d
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions datafusion/core/src/physical_plan/sorts/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::physical_plan::sorts::builder::SortOrder;
use crate::physical_plan::sorts::cursor::{Cursor, FieldArray, FieldCursor, RowCursor};
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{PhysicalExpr, PhysicalSortExpr};
use ahash::RandomState;
use arrow::array::Array;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -286,7 +287,7 @@ impl<C: Cursor> std::fmt::Debug for OffsetCursorStream<C> {

pub struct BatchTrackingStream<C: Cursor> {
/// Write once, read many [`RecordBatch`]s
batches: HashMap<Uuid, Arc<RecordBatch>>,
batches: HashMap<Uuid, Arc<RecordBatch>, RandomState>,
/// Input streams yielding [`Cursor`]s and [`RecordBatch`]es
streams: BatchCursorStream<C>,
/// Accounts for memory used by buffered batches
Expand All @@ -296,7 +297,7 @@ pub struct BatchTrackingStream<C: Cursor> {
impl<C: Cursor> BatchTrackingStream<C> {
pub fn new(streams: BatchCursorStream<C>, reservation: MemoryReservation) -> Self {
Self {
batches: HashMap::new(),
batches: HashMap::with_hasher(RandomState::new()),
streams,
reservation,
}
Expand Down Expand Up @@ -383,8 +384,8 @@ impl<C: Cursor + std::marker::Send> YieldedCursorStream<C> {
cursors: Vec<(C, Uuid, BatchOffset)>,
sort_order: Vec<SortOrder>,
) -> Result<()> {
let mut cursors_per_batch: HashMap<(Uuid, BatchOffset), C> =
HashMap::with_capacity(cursors.len());
let mut cursors_per_batch: HashMap<(Uuid, BatchOffset), C, RandomState> =
HashMap::with_capacity_and_hasher(cursors.len(), RandomState::new());
for (cursor, batch_id, batch_offset) in cursors {
cursors_per_batch.insert((batch_id, batch_offset), cursor);
}
Expand Down

0 comments on commit 76fee2d

Please sign in to comment.