diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 9b8d19cf8b0c..3885ccf6a3df 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -17,7 +17,7 @@ //! DataFusion error types -use std::error; +use std::error::Error; use std::fmt::{Display, Formatter}; use std::io; use std::result; @@ -37,7 +37,7 @@ use sqlparser::parser::ParserError; pub type Result = result::Result; /// Error type for generic operations that could result in DataFusionError::External -pub type GenericError = Box; +pub type GenericError = Box; /// DataFusion error #[derive(Debug)] @@ -203,6 +203,8 @@ impl Display for SchemaError { } } +impl Error for SchemaError {} + impl From for DataFusionError { fn from(e: io::Error) -> Self { DataFusionError::IoError(e) @@ -328,7 +330,32 @@ impl Display for DataFusionError { } } -impl error::Error for DataFusionError {} +impl Error for DataFusionError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + DataFusionError::ArrowError(e) => Some(e), + #[cfg(feature = "parquet")] + DataFusionError::ParquetError(e) => Some(e), + #[cfg(feature = "avro")] + DataFusionError::AvroError(e) => Some(e), + #[cfg(feature = "object_store")] + DataFusionError::ObjectStore(e) => Some(e), + DataFusionError::IoError(e) => Some(e), + DataFusionError::SQL(e) => Some(e), + DataFusionError::NotImplemented(_) => None, + DataFusionError::Internal(_) => None, + DataFusionError::Plan(_) => None, + DataFusionError::SchemaError(e) => Some(e), + DataFusionError::Execution(_) => None, + DataFusionError::ResourcesExhausted(_) => None, + DataFusionError::External(e) => Some(e.as_ref()), + #[cfg(feature = "jit")] + DataFusionError::JITError(e) => Some(e), + DataFusionError::Context(_, e) => Some(e.as_ref()), + DataFusionError::Substrait(_) => None, + } + } +} impl From for io::Error { fn from(e: DataFusionError) -> Self { @@ -336,12 +363,6 @@ impl From for io::Error { } } -/// Helper for [`DataFusionError::find_root`]. -enum OtherErr<'a> { - Arrow(&'a ArrowError), - Dyn(&'a (dyn std::error::Error + Send + Sync + 'static)), -} - impl DataFusionError { /// Get deepest underlying [`DataFusionError`] /// @@ -359,84 +380,49 @@ impl DataFusionError { /// /// This may be the same as `self`. pub fn find_root(&self) -> &Self { - // Note: This is a non-recursive algorithm so we do not run out of stack space, even for long error chains. The - // algorithm will always terminate because all steps access the next error through "converging" ownership, - // i.e. there can be a fan-in by multiple parents (e.g. via `Arc`), but never a fan-out by multiple - // children (e.g. via `Weak` or interior mutability via `Mutex`). - - // last error in the chain that was a DataFusionError - let mut checkpoint: &Self = self; - - // current non-DataFusion error - let mut other_e: Option> = None; - - loop { - // do we have another error type to explore? - if let Some(inner) = other_e { - // `other_e` is now bound to `inner`, so we can clear this path - other_e = None; - - match inner { - OtherErr::Arrow(inner) => { - if let ArrowError::ExternalError(inner) = inner { - other_e = Some(OtherErr::Dyn(inner.as_ref())); - continue; - } - } - OtherErr::Dyn(inner) => { - if let Some(inner) = inner.downcast_ref::() { - checkpoint = inner; - continue; - } - - if let Some(inner) = inner.downcast_ref::() { - other_e = Some(OtherErr::Arrow(inner)); - continue; - } - - // some errors are wrapped into `Arc`s to share them with multiple receivers - if let Some(inner) = inner.downcast_ref::>() { - checkpoint = inner.as_ref(); - continue; - } - - if let Some(inner) = inner.downcast_ref::>() { - other_e = Some(OtherErr::Arrow(inner.as_ref())); - continue; - } - } - } - - // dead end? - break; - } - - // traverse context chain - if let Self::Context(_msg, inner) = checkpoint { - checkpoint = inner; - continue; + // Note: This is a non-recursive algorithm so we do not run + // out of stack space, even for long error chains. + + let mut last_datafusion_error = self; + let mut root_error: &dyn Error = self; + while let Some(source) = find_source(root_error) { + // walk the next level + root_error = source; + // remember the lowest datafusion error so far + if let Some(e) = root_error.downcast_ref::() { + last_datafusion_error = e; } - - // The Arrow error may itself contain a datafusion error again - // See https://github.com/apache/arrow-datafusion/issues/4172 - if let Self::ArrowError(inner) = checkpoint { - other_e = Some(OtherErr::Arrow(inner)); - continue; - } - - // also try to introspect direct external errors - if let Self::External(inner) = checkpoint { - other_e = Some(OtherErr::Dyn(inner.as_ref())); - continue; - } - - // no more traversal - break; } - // return last checkpoint (which may be the original error) - checkpoint + last_datafusion_error + } +} + +fn find_source<'a>(e: &'a (dyn Error + 'static)) -> Option<&'a (dyn Error + 'static)> { + // workaround until https://github.com/apache/arrow-rs/issues/3566 is released + if let Some(e) = e.downcast_ref::() { + return if let ArrowError::ExternalError(e) = e { + Some(e.as_ref()) + } else { + None + }; } + // some errors are wrapped into `Arc`s to share them with multiple + // receivers, so handle that specially here + if let Some(e) = e.downcast_ref::>() { + return Some(e.as_ref()); + } + + // For some reason the above doesn't capture works for + // Arc or Arc + if let Some(e) = e.downcast_ref::>() { + return Some(e.as_ref()); + } + if let Some(e) = e.downcast_ref::>() { + return Some(e.as_ref()); + } + + e.source() } #[cfg(test)]