Skip to content

Commit

Permalink
Revert "Use arrow row format in SortPreservingMerge (apache#3386)"
Browse files Browse the repository at this point in the history
This reverts commit 451e441.
  • Loading branch information
alamb committed Oct 11, 2022
1 parent 5e1fd47 commit d303625
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 112 deletions.
144 changes: 124 additions & 20 deletions datafusion/core/src/physical_plan/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use arrow::row::{Row, Rows};
use crate::error;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::PhysicalExpr;
use arrow::array::{ArrayRef, DynComparator};
use arrow::compute::SortOptions;
use arrow::record_batch::RecordBatch;
use hashbrown::HashMap;
use parking_lot::RwLock;
use std::borrow::BorrowMut;
use std::cmp::Ordering;
use std::sync::Arc;

/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of
/// `PhysicalExpr` that when evaluated on the `RecordBatch` yield the sort keys.
Expand All @@ -29,35 +38,54 @@ use std::cmp::Ordering;
/// a row comparator for each other cursor that it is compared to.
pub struct SortKeyCursor {
stream_idx: usize,
sort_columns: Vec<ArrayRef>,
cur_row: usize,
num_rows: usize,

// An id uniquely identifying the record batch scanned by this cursor.
batch_id: usize,

rows: Rows,
// A collection of comparators that compare rows in this cursor's batch to
// the cursors in other batches. Other batches are uniquely identified by
// their batch_idx.
batch_comparators: RwLock<HashMap<usize, Vec<DynComparator>>>,
sort_options: Arc<Vec<SortOptions>>,
}

impl std::fmt::Debug for SortKeyCursor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("SortKeyCursor")
.field("sort_columns", &self.sort_columns)
.field("cur_row", &self.cur_row)
.field("num_rows", &self.num_rows)
.field("batch_id", &self.batch_id)
.field("batch_comparators", &"<FUNC>")
.finish()
}
}

impl SortKeyCursor {
/// Create a new SortKeyCursor
pub fn new(stream_idx: usize, batch_id: usize, rows: Rows) -> Self {
Self {
pub fn new(
stream_idx: usize,
batch_id: usize,
batch: &RecordBatch,
sort_key: &[Arc<dyn PhysicalExpr>],
sort_options: Arc<Vec<SortOptions>>,
) -> error::Result<Self> {
let sort_columns = sort_key
.iter()
.map(|expr| Ok(expr.evaluate(batch)?.into_array(batch.num_rows())))
.collect::<error::Result<_>>()?;
Ok(Self {
stream_idx,
cur_row: 0,
num_rows: rows.num_rows(),
num_rows: batch.num_rows(),
sort_columns,
batch_id,
rows,
}
batch_comparators: RwLock::new(HashMap::new()),
sort_options,
})
}

#[inline(always)]
Expand Down Expand Up @@ -87,30 +115,106 @@ impl SortKeyCursor {
t
}

/// Returns the current row
fn current(&self) -> Row<'_> {
self.rows.row(self.cur_row)
/// Compares the sort key pointed to by this instance's row cursor with that of another
pub fn compare(&self, other: &SortKeyCursor) -> error::Result<Ordering> {
if self.sort_columns.len() != other.sort_columns.len() {
return Err(DataFusionError::Internal(format!(
"SortKeyCursors had inconsistent column counts: {} vs {}",
self.sort_columns.len(),
other.sort_columns.len()
)));
}

if self.sort_columns.len() != self.sort_options.len() {
return Err(DataFusionError::Internal(format!(
"Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
self.sort_columns.len(),
self.sort_options.len()
)));
}

let zipped: Vec<((&ArrayRef, &ArrayRef), &SortOptions)> = self
.sort_columns
.iter()
.zip(other.sort_columns.iter())
.zip(self.sort_options.iter())
.collect::<Vec<_>>();

self.init_cmp_if_needed(other, &zipped)?;
let map = self.batch_comparators.read();
let cmp = map.get(&other.batch_id).ok_or_else(|| {
DataFusionError::Execution(format!(
"Failed to find comparator for {} cmp {}",
self.batch_id, other.batch_id
))
})?;

for (i, ((l, r), sort_options)) in zipped.iter().enumerate() {
match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
(false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
(false, true) => return Ok(Ordering::Greater),
(true, false) if sort_options.nulls_first => {
return Ok(Ordering::Greater)
}
(true, false) => return Ok(Ordering::Less),
(false, false) => {}
(true, true) => match cmp[i](self.cur_row, other.cur_row) {
Ordering::Equal => {}
o if sort_options.descending => return Ok(o.reverse()),
o => return Ok(o),
},
}
}

// Break ties using stream_idx to ensure a predictable
// ordering of rows when comparing equal streams.
Ok(self.stream_idx.cmp(&other.stream_idx))
}

/// Initialize a collection of comparators for comparing
/// columnar arrays of this cursor and "other" if needed.
fn init_cmp_if_needed(
&self,
other: &SortKeyCursor,
zipped: &[((&ArrayRef, &ArrayRef), &SortOptions)],
) -> Result<()> {
let hm = self.batch_comparators.read();
if !hm.contains_key(&other.batch_id) {
drop(hm);
let mut map = self.batch_comparators.write();
let cmp = map
.borrow_mut()
.entry(other.batch_id)
.or_insert_with(|| Vec::with_capacity(other.sort_columns.len()));

for (i, ((l, r), _)) in zipped.iter().enumerate() {
if i >= cmp.len() {
// initialise comparators
cmp.push(arrow::array::build_compare(l.as_ref(), r.as_ref())?);
}
}
}
Ok(())
}
}

impl Ord for SortKeyCursor {
/// Needed by min-heap comparison and reverse the order at the same time.
fn cmp(&self, other: &Self) -> Ordering {
other.compare(self).unwrap()
}
}

impl PartialEq for SortKeyCursor {
fn eq(&self, other: &Self) -> bool {
self.current() == other.current()
other.compare(self).unwrap() == Ordering::Equal
}
}

impl Eq for SortKeyCursor {}

impl PartialOrd for SortKeyCursor {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for SortKeyCursor {
fn cmp(&self, other: &Self) -> Ordering {
self.current()
.cmp(&other.current())
.then_with(|| self.stream_idx.cmp(&other.stream_idx))
other.compare(self).ok()
}
}
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl ExternalSorter {
&self.expr,
tracking_metrics,
self.session_config.batch_size(),
)?))
)))
} else if in_mem_batches.len() > 0 {
let tracking_metrics = self
.metrics_set
Expand Down
Loading

0 comments on commit d303625

Please sign in to comment.