Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement std::error::Error::source() for DataFusionError, make DataFusionError::find_root more generic #4992

Merged
merged 4 commits into from
Jan 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 69 additions & 83 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! DataFusion error types

use std::error;
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::io;
use std::result;
Expand All @@ -37,7 +37,7 @@ use sqlparser::parser::ParserError;
pub type Result<T> = result::Result<T, DataFusionError>;

/// Error type for generic operations that could result in DataFusionError::External
pub type GenericError = Box<dyn error::Error + Send + Sync>;
pub type GenericError = Box<dyn Error + Send + Sync>;

/// DataFusion error
#[derive(Debug)]
Expand Down Expand Up @@ -203,6 +203,8 @@ impl Display for SchemaError {
}
}

impl Error for SchemaError {}

impl From<io::Error> for DataFusionError {
fn from(e: io::Error) -> Self {
DataFusionError::IoError(e)
Expand Down Expand Up @@ -328,20 +330,39 @@ impl Display for DataFusionError {
}
}

impl error::Error for DataFusionError {}
impl Error for DataFusionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
DataFusionError::ArrowError(e) => Some(e),
#[cfg(feature = "parquet")]
DataFusionError::ParquetError(e) => Some(e),
#[cfg(feature = "avro")]
DataFusionError::AvroError(e) => Some(e),
#[cfg(feature = "object_store")]
DataFusionError::ObjectStore(e) => Some(e),
DataFusionError::IoError(e) => Some(e),
DataFusionError::SQL(e) => Some(e),
DataFusionError::NotImplemented(_) => None,
DataFusionError::Internal(_) => None,
DataFusionError::Plan(_) => None,
DataFusionError::SchemaError(e) => Some(e),
DataFusionError::Execution(_) => None,
DataFusionError::ResourcesExhausted(_) => None,
DataFusionError::External(e) => Some(e.as_ref()),
#[cfg(feature = "jit")]
DataFusionError::JITError(e) => Some(e),
DataFusionError::Context(_, e) => Some(e.as_ref()),
DataFusionError::Substrait(_) => None,
}
}
}

impl From<DataFusionError> for io::Error {
fn from(e: DataFusionError) -> Self {
io::Error::new(io::ErrorKind::Other, e)
}
}

/// Helper for [`DataFusionError::find_root`].
enum OtherErr<'a> {
Arrow(&'a ArrowError),
Dyn(&'a (dyn std::error::Error + Send + Sync + 'static)),
}

impl DataFusionError {
/// Get deepest underlying [`DataFusionError`]
///
Expand All @@ -359,84 +380,49 @@ impl DataFusionError {
///
/// This may be the same as `self`.
pub fn find_root(&self) -> &Self {
// Note: This is a non-recursive algorithm so we do not run out of stack space, even for long error chains. The
// algorithm will always terminate because all steps access the next error through "converging" ownership,
// i.e. there can be a fan-in by multiple parents (e.g. via `Arc`), but never a fan-out by multiple
// children (e.g. via `Weak` or interior mutability via `Mutex`).

// last error in the chain that was a DataFusionError
let mut checkpoint: &Self = self;

// current non-DataFusion error
let mut other_e: Option<OtherErr<'_>> = None;

loop {
// do we have another error type to explore?
if let Some(inner) = other_e {
// `other_e` is now bound to `inner`, so we can clear this path
other_e = None;

match inner {
OtherErr::Arrow(inner) => {
if let ArrowError::ExternalError(inner) = inner {
other_e = Some(OtherErr::Dyn(inner.as_ref()));
continue;
}
}
OtherErr::Dyn(inner) => {
if let Some(inner) = inner.downcast_ref::<Self>() {
checkpoint = inner;
continue;
}

if let Some(inner) = inner.downcast_ref::<ArrowError>() {
other_e = Some(OtherErr::Arrow(inner));
continue;
}

// some errors are wrapped into `Arc`s to share them with multiple receivers
if let Some(inner) = inner.downcast_ref::<Arc<Self>>() {
checkpoint = inner.as_ref();
continue;
}

if let Some(inner) = inner.downcast_ref::<Arc<ArrowError>>() {
other_e = Some(OtherErr::Arrow(inner.as_ref()));
continue;
}
}
}

// dead end?
break;
}

// traverse context chain
if let Self::Context(_msg, inner) = checkpoint {
checkpoint = inner;
continue;
// Note: This is a non-recursive algorithm so we do not run
// out of stack space, even for long error chains.

let mut last_datafusion_error = self;
let mut root_error: &dyn Error = self;
while let Some(source) = find_source(root_error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am quite pleased with how this works (and it will work for other intervening wrapping error types if they implement source() as well)

// walk the next level
root_error = source;
// remember the lowest datafusion error so far
if let Some(e) = root_error.downcast_ref::<DataFusionError>() {
last_datafusion_error = e;
}

// The Arrow error may itself contain a datafusion error again
// See https://github.com/apache/arrow-datafusion/issues/4172
if let Self::ArrowError(inner) = checkpoint {
other_e = Some(OtherErr::Arrow(inner));
continue;
}

// also try to introspect direct external errors
if let Self::External(inner) = checkpoint {
other_e = Some(OtherErr::Dyn(inner.as_ref()));
continue;
}

// no more traversal
break;
}

// return last checkpoint (which may be the original error)
checkpoint
last_datafusion_error
}
}

fn find_source<'a>(e: &'a (dyn Error + 'static)) -> Option<&'a (dyn Error + 'static)> {
// workaround until https://github.com/apache/arrow-rs/issues/3566 is released
if let Some(e) = e.downcast_ref::<ArrowError>() {
return if let ArrowError::ExternalError(e) = e {
Some(e.as_ref())
} else {
None
};
}
// some errors are wrapped into `Arc`s to share them with multiple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? The source implementation of Arc should already call this?

https://doc.rust-lang.org/src/alloc/sync.rs.html#2782

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary, but I don't know why. The tests fail without it. If you could help figure out how to avoid it it would be great.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f3d2ae6

// receivers, so handle that specially here
if let Some(e) = e.downcast_ref::<Arc<dyn Error + 'static>>() {
return Some(e.as_ref());
}

// For some reason the above doesn't capture works for
// Arc<DataFusionError> or Arc<ArrowError>
if let Some(e) = e.downcast_ref::<Arc<ArrowError>>() {
return Some(e.as_ref());
}
if let Some(e) = e.downcast_ref::<Arc<DataFusionError>>() {
return Some(e.as_ref());
}

e.source()
}

#[cfg(test)]
Expand Down