Skip to content

Commit

Permalink
Implement std::error::Error::source() for DataFusionError, make `…
Browse files Browse the repository at this point in the history
…DataFusionError::find_root` more generic (#4992)

* Implement `std::error::Error::source()` for DataFusionError

* Update implementation of DataFusionError::find_root

* Fix avro build
  • Loading branch information
alamb authored Jan 22, 2023
1 parent 350cb47 commit e713547
Showing 1 changed file with 69 additions and 83 deletions.
152 changes: 69 additions & 83 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! DataFusion error types

use std::error;
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::io;
use std::result;
Expand All @@ -40,7 +40,7 @@ pub type Result<T, E = DataFusionError> = result::Result<T, E>;
pub type SharedResult<T> = result::Result<T, Arc<DataFusionError>>;

/// Error type for generic operations that could result in DataFusionError::External
pub type GenericError = Box<dyn error::Error + Send + Sync>;
pub type GenericError = Box<dyn Error + Send + Sync>;

/// DataFusion error
#[derive(Debug)]
Expand Down Expand Up @@ -206,6 +206,8 @@ impl Display for SchemaError {
}
}

impl Error for SchemaError {}

impl From<io::Error> for DataFusionError {
fn from(e: io::Error) -> Self {
DataFusionError::IoError(e)
Expand Down Expand Up @@ -331,20 +333,39 @@ impl Display for DataFusionError {
}
}

impl error::Error for DataFusionError {}
impl Error for DataFusionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
DataFusionError::ArrowError(e) => Some(e),
#[cfg(feature = "parquet")]
DataFusionError::ParquetError(e) => Some(e),
#[cfg(feature = "avro")]
DataFusionError::AvroError(e) => Some(e),
#[cfg(feature = "object_store")]
DataFusionError::ObjectStore(e) => Some(e),
DataFusionError::IoError(e) => Some(e),
DataFusionError::SQL(e) => Some(e),
DataFusionError::NotImplemented(_) => None,
DataFusionError::Internal(_) => None,
DataFusionError::Plan(_) => None,
DataFusionError::SchemaError(e) => Some(e),
DataFusionError::Execution(_) => None,
DataFusionError::ResourcesExhausted(_) => None,
DataFusionError::External(e) => Some(e.as_ref()),
#[cfg(feature = "jit")]
DataFusionError::JITError(e) => Some(e),
DataFusionError::Context(_, e) => Some(e.as_ref()),
DataFusionError::Substrait(_) => None,
}
}
}

impl From<DataFusionError> for io::Error {
fn from(e: DataFusionError) -> Self {
io::Error::new(io::ErrorKind::Other, e)
}
}

/// Helper for [`DataFusionError::find_root`].
enum OtherErr<'a> {
Arrow(&'a ArrowError),
Dyn(&'a (dyn std::error::Error + Send + Sync + 'static)),
}

impl DataFusionError {
/// Get deepest underlying [`DataFusionError`]
///
Expand All @@ -362,84 +383,49 @@ impl DataFusionError {
///
/// This may be the same as `self`.
pub fn find_root(&self) -> &Self {
// Note: This is a non-recursive algorithm so we do not run out of stack space, even for long error chains. The
// algorithm will always terminate because all steps access the next error through "converging" ownership,
// i.e. there can be a fan-in by multiple parents (e.g. via `Arc`), but never a fan-out by multiple
// children (e.g. via `Weak` or interior mutability via `Mutex`).

// last error in the chain that was a DataFusionError
let mut checkpoint: &Self = self;

// current non-DataFusion error
let mut other_e: Option<OtherErr<'_>> = None;

loop {
// do we have another error type to explore?
if let Some(inner) = other_e {
// `other_e` is now bound to `inner`, so we can clear this path
other_e = None;

match inner {
OtherErr::Arrow(inner) => {
if let ArrowError::ExternalError(inner) = inner {
other_e = Some(OtherErr::Dyn(inner.as_ref()));
continue;
}
}
OtherErr::Dyn(inner) => {
if let Some(inner) = inner.downcast_ref::<Self>() {
checkpoint = inner;
continue;
}

if let Some(inner) = inner.downcast_ref::<ArrowError>() {
other_e = Some(OtherErr::Arrow(inner));
continue;
}

// some errors are wrapped into `Arc`s to share them with multiple receivers
if let Some(inner) = inner.downcast_ref::<Arc<Self>>() {
checkpoint = inner.as_ref();
continue;
}

if let Some(inner) = inner.downcast_ref::<Arc<ArrowError>>() {
other_e = Some(OtherErr::Arrow(inner.as_ref()));
continue;
}
}
}

// dead end?
break;
}

// traverse context chain
if let Self::Context(_msg, inner) = checkpoint {
checkpoint = inner;
continue;
// Note: This is a non-recursive algorithm so we do not run
// out of stack space, even for long error chains.

let mut last_datafusion_error = self;
let mut root_error: &dyn Error = self;
while let Some(source) = find_source(root_error) {
// walk the next level
root_error = source;
// remember the lowest datafusion error so far
if let Some(e) = root_error.downcast_ref::<DataFusionError>() {
last_datafusion_error = e;
}

// The Arrow error may itself contain a datafusion error again
// See https://github.com/apache/arrow-datafusion/issues/4172
if let Self::ArrowError(inner) = checkpoint {
other_e = Some(OtherErr::Arrow(inner));
continue;
}

// also try to introspect direct external errors
if let Self::External(inner) = checkpoint {
other_e = Some(OtherErr::Dyn(inner.as_ref()));
continue;
}

// no more traversal
break;
}

// return last checkpoint (which may be the original error)
checkpoint
last_datafusion_error
}
}

fn find_source<'a>(e: &'a (dyn Error + 'static)) -> Option<&'a (dyn Error + 'static)> {
// workaround until https://github.com/apache/arrow-rs/issues/3566 is released
if let Some(e) = e.downcast_ref::<ArrowError>() {
return if let ArrowError::ExternalError(e) = e {
Some(e.as_ref())
} else {
None
};
}
// some errors are wrapped into `Arc`s to share them with multiple
// receivers, so handle that specially here
if let Some(e) = e.downcast_ref::<Arc<dyn Error + 'static>>() {
return Some(e.as_ref());
}

// For some reason the above doesn't capture works for
// Arc<DataFusionError> or Arc<ArrowError>
if let Some(e) = e.downcast_ref::<Arc<ArrowError>>() {
return Some(e.as_ref());
}
if let Some(e) = e.downcast_ref::<Arc<DataFusionError>>() {
return Some(e.as_ref());
}

e.source()
}

#[cfg(test)]
Expand Down

0 comments on commit e713547

Please sign in to comment.