-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement std::error::Error::source()
for DataFusionError
, make DataFusionError::find_root
more generic
#4992
Changes from 2 commits
4862fc0
def1727
1dd4c4c
a28e80f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
|
||
//! DataFusion error types | ||
|
||
use std::error; | ||
use std::error::Error; | ||
use std::fmt::{Display, Formatter}; | ||
use std::io; | ||
use std::result; | ||
|
@@ -37,7 +37,7 @@ use sqlparser::parser::ParserError; | |
pub type Result<T> = result::Result<T, DataFusionError>; | ||
|
||
/// Error type for generic operations that could result in DataFusionError::External | ||
pub type GenericError = Box<dyn error::Error + Send + Sync>; | ||
pub type GenericError = Box<dyn Error + Send + Sync>; | ||
|
||
/// DataFusion error | ||
#[derive(Debug)] | ||
|
@@ -203,6 +203,8 @@ impl Display for SchemaError { | |
} | ||
} | ||
|
||
impl Error for SchemaError {} | ||
|
||
impl From<io::Error> for DataFusionError { | ||
fn from(e: io::Error) -> Self { | ||
DataFusionError::IoError(e) | ||
|
@@ -328,20 +330,39 @@ impl Display for DataFusionError { | |
} | ||
} | ||
|
||
impl error::Error for DataFusionError {} | ||
impl Error for DataFusionError { | ||
fn source(&self) -> Option<&(dyn Error + 'static)> { | ||
match self { | ||
DataFusionError::ArrowError(e) => Some(e), | ||
#[cfg(feature = "parquet")] | ||
DataFusionError::ParquetError(e) => Some(e), | ||
#[cfg(feature = "avro")] | ||
AvroError(e) => Some(e), | ||
#[cfg(feature = "object_store")] | ||
DataFusionError::ObjectStore(e) => Some(e), | ||
DataFusionError::IoError(e) => Some(e), | ||
DataFusionError::SQL(e) => Some(e), | ||
DataFusionError::NotImplemented(_) => None, | ||
DataFusionError::Internal(_) => None, | ||
DataFusionError::Plan(_) => None, | ||
DataFusionError::SchemaError(e) => Some(e), | ||
DataFusionError::Execution(_) => None, | ||
DataFusionError::ResourcesExhausted(_) => None, | ||
DataFusionError::External(e) => Some(e.as_ref()), | ||
#[cfg(feature = "jit")] | ||
DataFusionError::JITError(e) => Some(e), | ||
DataFusionError::Context(_, e) => Some(e.as_ref()), | ||
DataFusionError::Substrait(_) => None, | ||
} | ||
} | ||
} | ||
|
||
impl From<DataFusionError> for io::Error { | ||
fn from(e: DataFusionError) -> Self { | ||
io::Error::new(io::ErrorKind::Other, e) | ||
} | ||
} | ||
|
||
/// Helper for [`DataFusionError::find_root`]. | ||
enum OtherErr<'a> { | ||
Arrow(&'a ArrowError), | ||
Dyn(&'a (dyn std::error::Error + Send + Sync + 'static)), | ||
} | ||
|
||
impl DataFusionError { | ||
/// Get deepest underlying [`DataFusionError`] | ||
/// | ||
|
@@ -359,84 +380,49 @@ impl DataFusionError { | |
/// | ||
/// This may be the same as `self`. | ||
pub fn find_root(&self) -> &Self { | ||
// Note: This is a non-recursive algorithm so we do not run out of stack space, even for long error chains. The | ||
// algorithm will always terminate because all steps access the next error through "converging" ownership, | ||
// i.e. there can be a fan-in by multiple parents (e.g. via `Arc`), but never a fan-out by multiple | ||
// children (e.g. via `Weak` or interior mutability via `Mutex`). | ||
|
||
// last error in the chain that was a DataFusionError | ||
let mut checkpoint: &Self = self; | ||
|
||
// current non-DataFusion error | ||
let mut other_e: Option<OtherErr<'_>> = None; | ||
|
||
loop { | ||
// do we have another error type to explore? | ||
if let Some(inner) = other_e { | ||
// `other_e` is now bound to `inner`, so we can clear this path | ||
other_e = None; | ||
|
||
match inner { | ||
OtherErr::Arrow(inner) => { | ||
if let ArrowError::ExternalError(inner) = inner { | ||
other_e = Some(OtherErr::Dyn(inner.as_ref())); | ||
continue; | ||
} | ||
} | ||
OtherErr::Dyn(inner) => { | ||
if let Some(inner) = inner.downcast_ref::<Self>() { | ||
checkpoint = inner; | ||
continue; | ||
} | ||
|
||
if let Some(inner) = inner.downcast_ref::<ArrowError>() { | ||
other_e = Some(OtherErr::Arrow(inner)); | ||
continue; | ||
} | ||
|
||
// some errors are wrapped into `Arc`s to share them with multiple receivers | ||
if let Some(inner) = inner.downcast_ref::<Arc<Self>>() { | ||
checkpoint = inner.as_ref(); | ||
continue; | ||
} | ||
|
||
if let Some(inner) = inner.downcast_ref::<Arc<ArrowError>>() { | ||
other_e = Some(OtherErr::Arrow(inner.as_ref())); | ||
continue; | ||
} | ||
} | ||
} | ||
|
||
// dead end? | ||
break; | ||
} | ||
|
||
// traverse context chain | ||
if let Self::Context(_msg, inner) = checkpoint { | ||
checkpoint = inner; | ||
continue; | ||
// Note: This is a non-recursive algorithm so we do not run | ||
// out of stack space, even for long error chains. | ||
|
||
let mut last_datafusion_error = self; | ||
let mut root_error: &dyn Error = self; | ||
while let Some(source) = find_source(root_error) { | ||
// walk the next level | ||
root_error = source; | ||
// remember the lowest datafusion error so far | ||
if let Some(e) = root_error.downcast_ref::<DataFusionError>() { | ||
last_datafusion_error = e; | ||
} | ||
|
||
// The Arrow error may itself contain a datafusion error again | ||
// See https://github.com/apache/arrow-datafusion/issues/4172 | ||
if let Self::ArrowError(inner) = checkpoint { | ||
other_e = Some(OtherErr::Arrow(inner)); | ||
continue; | ||
} | ||
|
||
// also try to introspect direct external errors | ||
if let Self::External(inner) = checkpoint { | ||
other_e = Some(OtherErr::Dyn(inner.as_ref())); | ||
continue; | ||
} | ||
|
||
// no more traversal | ||
break; | ||
} | ||
|
||
// return last checkpoint (which may be the original error) | ||
checkpoint | ||
last_datafusion_error | ||
} | ||
} | ||
|
||
fn find_source<'a>(e: &'a (dyn Error + 'static)) -> Option<&'a (dyn Error + 'static)> { | ||
// workaround until https://github.com/apache/arrow-rs/issues/3566 is released | ||
if let Some(e) = e.downcast_ref::<ArrowError>() { | ||
return if let ArrowError::ExternalError(e) = e { | ||
Some(e.as_ref()) | ||
} else { | ||
None | ||
}; | ||
} | ||
// some errors are wrapped into `Arc`s to share them with multiple | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this necessary? The source implementation of Arc should already call this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is necessary, but I don't know why. The tests fail without it. If you could help figure out how to avoid it it would be great. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in f3d2ae6 |
||
// receivers, so handle that specially here | ||
if let Some(e) = e.downcast_ref::<Arc<dyn Error + 'static>>() { | ||
return Some(e.as_ref()); | ||
} | ||
|
||
// For some reason the above doesn't capture works for | ||
// Arc<DataFusionError> or Arc<ArrowError> | ||
if let Some(e) = e.downcast_ref::<Arc<ArrowError>>() { | ||
return Some(e.as_ref()); | ||
} | ||
if let Some(e) = e.downcast_ref::<Arc<DataFusionError>>() { | ||
return Some(e.as_ref()); | ||
} | ||
|
||
e.source() | ||
} | ||
|
||
#[cfg(test)] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am quite pleased with how this works (and it will work for other intervening wrapping error types if they implement
source()
as well)