Skip to content

Commit

Permalink
make SortPreservingMergeStream::build_record_batch fallible
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 26, 2021
1 parent 7a787c0 commit 270ca62
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions datafusion/src/physical_plan/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl SortPreservingMergeStream {
/// Drains the in_progress row indexes, and builds a new RecordBatch from them
///
/// Will then drop any cursors for which all rows have been yielded to the output
fn build_record_batch(&mut self) -> RecordBatch {
fn build_record_batch(&mut self) -> ArrowResult<RecordBatch> {
// Mapping from stream index to the index of the first buffer from that stream
let mut buffer_idx = 0;
let mut stream_to_buffer_idx = Vec::with_capacity(self.cursors.len());
Expand Down Expand Up @@ -459,7 +459,6 @@ impl SortPreservingMergeStream {
}

RecordBatch::try_new(self.schema.clone(), columns)
.expect("SortPreservingMergeStream: merged record batch was invalid")
}
}

Expand Down Expand Up @@ -490,7 +489,7 @@ impl Stream for SortPreservingMergeStream {
let stream_idx = match self.next_stream_idx() {
Ok(Some(idx)) => idx,
Ok(None) if self.in_progress.is_empty() => return Poll::Ready(None),
Ok(None) => return Poll::Ready(Some(Ok(self.build_record_batch()))),
Ok(None) => return Poll::Ready(Some(self.build_record_batch())),
Err(e) => {
self.aborted = true;
return Poll::Ready(Some(Err(ArrowError::ExternalError(Box::new(
Expand All @@ -512,7 +511,7 @@ impl Stream for SortPreservingMergeStream {
});

if self.in_progress.len() == self.target_batch_size {
return Poll::Ready(Some(Ok(self.build_record_batch())));
return Poll::Ready(Some(self.build_record_batch()));
}

// If removed the last row from the cursor, need to fetch a new record
Expand Down

0 comments on commit 270ca62

Please sign in to comment.