Skip to content

Commit

Permalink
Use DataFusionError instead of ArrowError in SendableRecordBatchStream (
Browse files Browse the repository at this point in the history
#5101)

* Replace ArrowError with DataFusionError in DF context

* fix comments
  • Loading branch information
comphead authored Jan 30, 2023
1 parent 9c8bdfe commit 74b05fa
Show file tree
Hide file tree
Showing 38 changed files with 213 additions and 254 deletions.
3 changes: 1 addition & 2 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,6 @@ mod tests {
use crate::{assert_batches_sorted_eq, physical_plan::common};
use arrow::array::{Float64Array, UInt32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::{lit, ApproxDistinct, Count, Median};
Expand Down Expand Up @@ -1038,7 +1037,7 @@ mod tests {
}

impl Stream for TestYieldingStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/physical_plan/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::physical_plan::aggregates::{
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use arrow::datatypes::SchemaRef;
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
Expand All @@ -38,7 +37,7 @@ use futures::stream::{Stream, StreamExt};

/// stream struct for aggregation without grouping columns
pub(crate) struct AggregateStream {
stream: BoxStream<'static, ArrowResult<RecordBatch>>,
stream: BoxStream<'static, Result<RecordBatch>>,
schema: SchemaRef,
}

Expand Down Expand Up @@ -112,17 +111,17 @@ impl AggregateStream {
.and_then(|allocated| this.reservation.try_grow(allocated))
{
Ok(_) => continue,
Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
Err(e) => Err(e),
}
}
Some(Err(e)) => Err(e),
None => {
this.finished = true;
let timer = this.baseline_metrics.elapsed_compute().timer();
let result = finalize_aggregation(&this.accumulators, &this.mode)
.map_err(|e| ArrowError::ExternalError(Box::new(e)))
.and_then(|columns| {
RecordBatch::try_new(this.schema.clone(), columns)
.map_err(Into::into)
})
.record_output(&this.baseline_metrics);

Expand All @@ -146,7 +145,7 @@ impl AggregateStream {
}

impl Stream for AggregateStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
14 changes: 4 additions & 10 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@ use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};

use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use arrow::array::{new_null_array, PrimitiveArray};
use arrow::array::{Array, UInt32Builder};
use arrow::compute::cast;
use arrow::datatypes::{DataType, Schema, UInt32Type};
use arrow::{array::ArrayRef, compute};
use arrow::{
array::{Array, UInt32Builder},
error::{ArrowError, Result as ArrowResult},
};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::Accumulator;
Expand Down Expand Up @@ -226,7 +223,7 @@ impl GroupedHashAggregateStream {
}

impl Stream for GroupedHashAggregateStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand All @@ -252,9 +249,7 @@ impl Stream for GroupedHashAggregateStream {
});

if let Err(e) = result {
return Poll::Ready(Some(Err(
ArrowError::ExternalError(Box::new(e)),
)));
return Poll::Ready(Some(Err(e)));
}
}
// inner had error, return to caller
Expand Down Expand Up @@ -569,7 +564,7 @@ impl std::fmt::Debug for RowAggregationState {

impl GroupedHashAggregateStream {
/// Create a RecordBatch with all group keys and accumulator' states or values.
fn create_batch_from_map(&mut self) -> ArrowResult<Option<RecordBatch>> {
fn create_batch_from_map(&mut self) -> Result<Option<RecordBatch>> {
let skip_items = self.row_group_skip_position;
if skip_items > self.row_aggr_state.group_states.len() {
return Ok(None);
Expand Down Expand Up @@ -624,7 +619,6 @@ impl GroupedHashAggregateStream {
// the intermediate GroupByScalar type was not the same as the
// output
cast(&item, field.data_type())
.map_err(DataFusionError::ArrowError)
}?;
results.push(result);
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ impl ExecutionPlan for AnalyzeExec {
Arc::new(type_builder.finish()),
Arc::new(plan_builder.finish()),
],
);
)
.map_err(Into::into);
// again ignore error
tx.send(maybe_batch).await.ok();
});
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ struct CoalesceBatchesStream {
}

impl Stream for CoalesceBatchesStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
Expand All @@ -200,7 +200,7 @@ impl CoalesceBatchesStream {
fn poll_next_inner(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<ArrowResult<RecordBatch>>> {
) -> Poll<Option<Result<RecordBatch>>> {
// Get a clone (uses same underlying atomic) as self gets borrowed below
let cloned_time = self.baseline_metrics.elapsed_compute().clone();

Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::task::Poll;
use futures::Stream;
use tokio::sync::mpsc;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};

use super::common::AbortOnDropMany;
use super::expressions::PhysicalSortExpr;
Expand Down Expand Up @@ -138,7 +138,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
// least one result in an attempt to maximize
// parallelism.
let (sender, receiver) =
mpsc::channel::<ArrowResult<RecordBatch>>(input_partitions);
mpsc::channel::<Result<RecordBatch>>(input_partitions);

// spawn independent tasks whose resulting streams (of batches)
// are sent to the channel for consumption.
Expand Down Expand Up @@ -185,14 +185,14 @@ impl ExecutionPlan for CoalescePartitionsExec {

struct MergeStream {
schema: SchemaRef,
input: mpsc::Receiver<ArrowResult<RecordBatch>>,
input: mpsc::Receiver<Result<RecordBatch>>,
baseline_metrics: BaselineMetrics,
#[allow(unused)]
drop_helper: AbortOnDropMany<()>,
}

impl Stream for MergeStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
27 changes: 12 additions & 15 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statist
use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::error::Result as ArrowResult;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::utils::ordering_satisfy;
Expand Down Expand Up @@ -68,7 +67,7 @@ impl SizedRecordBatchStream {
}

impl Stream for SizedRecordBatchStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand All @@ -92,10 +91,7 @@ impl RecordBatchStream for SizedRecordBatchStream {

/// Create a vector of record batches from a stream
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
stream
.try_collect::<Vec<_>>()
.await
.map_err(DataFusionError::from)
stream.try_collect::<Vec<_>>().await
}

/// Merge two record batch references into a single record batch.
Expand All @@ -104,15 +100,16 @@ pub fn merge_batches(
first: &RecordBatch,
second: &RecordBatch,
schema: SchemaRef,
) -> ArrowResult<RecordBatch> {
) -> Result<RecordBatch> {
let columns = (0..schema.fields.len())
.map(|index| {
let first_column = first.column(index).as_ref();
let second_column = second.column(index).as_ref();
concat(&[first_column, second_column])
})
.collect::<ArrowResult<Vec<_>>>()?;
RecordBatch::try_new(schema, columns)
.collect::<Result<Vec<_>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?;
RecordBatch::try_new(schema, columns).map_err(Into::into)
}

/// Merge a slice of record batch references into a single record batch, or
Expand All @@ -121,7 +118,7 @@ pub fn merge_batches(
pub fn merge_multiple_batches(
batches: &[&RecordBatch],
schema: SchemaRef,
) -> ArrowResult<Option<RecordBatch>> {
) -> Result<Option<RecordBatch>> {
Ok(if batches.is_empty() {
None
} else {
Expand All @@ -134,7 +131,8 @@ pub fn merge_multiple_batches(
.collect::<Vec<_>>(),
)
})
.collect::<ArrowResult<Vec<_>>>()?;
.collect::<Result<Vec<_>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?;
Some(RecordBatch::try_new(schema, columns)?)
})
}
Expand Down Expand Up @@ -190,7 +188,7 @@ fn build_file_list_recurse(
/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender
pub(crate) fn spawn_execution(
input: Arc<dyn ExecutionPlan>,
output: mpsc::Sender<ArrowResult<RecordBatch>>,
output: mpsc::Sender<Result<RecordBatch>>,
partition: usize,
context: Arc<TaskContext>,
) -> JoinHandle<()> {
Expand All @@ -199,8 +197,7 @@ pub(crate) fn spawn_execution(
Err(e) => {
// If send fails, plan being torn down,
// there is no place to send the error.
let arrow_error = ArrowError::ExternalError(Box::new(e));
output.send(Err(arrow_error)).await.ok();
output.send(Err(e)).await.ok();
debug!(
"Stopping execution: error executing input: {}",
displayable(input.as_ref()).one_line()
Expand Down Expand Up @@ -524,7 +521,7 @@ impl IPCWriter {

/// Finish the writer
pub fn finish(&mut self) -> Result<()> {
self.writer.finish().map_err(DataFusionError::ArrowError)
self.writer.finish().map_err(Into::into)
}

/// Path write to
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ mod tests {
let err = it.next().await.unwrap().unwrap_err().to_string();
assert_eq!(
err,
"Csv error: incorrect number of fields for line 1, expected 14 got 13"
"Arrow error: Csv error: incorrect number of fields for line 1, expected 14 got 13"
);
Ok(())
}
Expand Down
26 changes: 14 additions & 12 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use std::task::{Context, Poll};
use std::time::Instant;

use arrow::datatypes::SchemaRef;
use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
Expand All @@ -45,7 +46,7 @@ use crate::physical_plan::RecordBatchStream;

/// A fallible future that resolves to a stream of [`RecordBatch`]
pub type FileOpenFuture =
BoxFuture<'static, Result<BoxStream<'static, ArrowResult<RecordBatch>>>>;
BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;

/// Generic API for opening a file using an [`ObjectStore`] and resolving to a
/// stream of [`RecordBatch`]
Expand Down Expand Up @@ -96,7 +97,7 @@ enum FileStreamState {
/// Partitioning column values for the current batch_iter
partition_values: Vec<ScalarValue>,
/// The reader instance
reader: BoxStream<'static, ArrowResult<RecordBatch>>,
reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
},
/// Encountered an error
Error,
Expand Down Expand Up @@ -201,10 +202,7 @@ impl<F: FileOpener> FileStream<F> {
})
}

fn poll_inner(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<ArrowResult<RecordBatch>>> {
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
loop {
match &mut self.state {
FileStreamState::Idle => {
Expand All @@ -230,7 +228,7 @@ impl<F: FileOpener> FileStream<F> {
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e.into())));
return Poll::Ready(Some(Err(e)));
}
}
}
Expand All @@ -249,7 +247,7 @@ impl<F: FileOpener> FileStream<F> {
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e.into())));
return Poll::Ready(Some(Err(e)));
}
},
FileStreamState::Scan {
Expand All @@ -260,7 +258,11 @@ impl<F: FileOpener> FileStream<F> {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
let result = result
.and_then(|b| self.pc_projector.project(b, partition_values))
.and_then(|b| {
self.pc_projector
.project(b, partition_values)
.map_err(|e| ArrowError::ExternalError(e.into()))
})
.map(|batch| match &mut self.remain {
Some(remain) => {
if *remain > batch.num_rows() {
Expand All @@ -280,7 +282,7 @@ impl<F: FileOpener> FileStream<F> {
self.state = FileStreamState::Error
}
self.file_stream_metrics.time_scanning_total.start();
return Poll::Ready(Some(result));
return Poll::Ready(Some(result.map_err(Into::into)));
}
None => {
self.file_stream_metrics.time_scanning_until_data.stop();
Expand All @@ -297,7 +299,7 @@ impl<F: FileOpener> FileStream<F> {
}

impl<F: FileOpener> Stream for FileStream<F> {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
Expand Down
Loading

0 comments on commit 74b05fa

Please sign in to comment.