Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(4981): incorrect error wrapping in OnceFut #4983

Merged
merged 2 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ use sqlparser::parser::ParserError;
/// Result type for operations that could result in an [DataFusionError]
pub type Result<T> = result::Result<T, DataFusionError>;

/// Result type for operations that could result in an [DataFusionError] and needs to be shared (wrapped into `Arc`).
pub type SharedResult<T> = result::Result<T, Arc<DataFusionError>>;
Copy link
Contributor

@tustvold tustvold Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this PR never actually returns this type, do we need to declare it here? Or could we just move it into utils?

Copy link
Contributor Author

@DDtKey DDtKey Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I observed that Result<T, Arc<Error> is quite common approach to allow clonning errors in datafustion & arrow.

And not to miss such kind of errors, I think it would be nice to have it as general type.
Take a look at #4992 and even current find_root impl, it's easy to forget that error could be wrapped into Arc actually (IMO)

I believe this type should be used when it's needed to explicitly declare that error is wrapped into Arc. And could be used in other places later.

And this PR doesn't return this type, however it will be inside ArrowError::ExternalError(..), so actually it's way of explicit usage to be able to check all usages of this pattern

Copy link
Contributor

@tustvold tustvold Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allow clonning errors in datafustion & arrow.

I don't see any other examples of this pattern?

so actually it's way of explicit usage to be able to check all usages of this pattern

I'd hope that any codepath that cares about these variants would walk the source tree and effectively blindly skip over the Arc...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #4992 for the code (that does indeed walk over Arcs). It was non ideal however. Figuring out some way to avoid Arc'ing errors would be better in my opinion (not for this PR)


/// Error type for generic operations that could result in DataFusionError::External
pub type GenericError = Box<dyn error::Error + Send + Sync>;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub mod test_util;
use arrow::compute::SortOptions;
pub use column::Column;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
pub use error::{field_not_found, DataFusionError, Result, SchemaError};
pub use error::{field_not_found, DataFusionError, Result, SchemaError, SharedResult};
pub use parsers::parse_interval;
pub use scalar::{ScalarType, ScalarValue};
pub use stats::{ColumnStatistics, Statistics};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
// under the License.

//! DataFusion error types
pub use datafusion_common::{DataFusionError, Result};
pub use datafusion_common::{DataFusionError, Result, SharedResult};
50 changes: 45 additions & 5 deletions datafusion/core/src/physical_plan/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Join related functionality used both on logical and physical plans

use crate::error::{DataFusionError, Result};
use crate::error::{DataFusionError, Result, SharedResult};
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::SchemaRef;
Expand Down Expand Up @@ -438,7 +438,7 @@ impl<T: 'static> OnceAsync<T> {
}

/// The shared future type used internally within [`OnceAsync`]
type OnceFutPending<T> = Shared<BoxFuture<'static, Arc<Result<T>>>>;
type OnceFutPending<T> = Shared<BoxFuture<'static, Arc<SharedResult<T>>>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that SharedResult has an Arc, can we avoid the double-arc here?

Copy link
Contributor Author

@DDtKey DDtKey Jan 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OnceFut is cloneable, while SharedResult wraps only Err into Arc to allow cloning the error itself 🤔

I decided to declare it as SharedResult<Arc<T>> instead, so it won't be Arc inside Arc. It will be either content of Ok or Error wrapped into Arc.


/// A [`OnceFut`] represents a shared asynchronous computation, that will be evaluated
/// once for all [`Clone`]'s, with [`OnceFut::get`] providing a non-consuming interface
Expand Down Expand Up @@ -653,7 +653,7 @@ fn get_int_range(min: ScalarValue, max: ScalarValue) -> Option<usize> {

enum OnceFutState<T> {
Pending(OnceFutPending<T>),
Ready(Arc<Result<T>>),
Ready(Arc<SharedResult<T>>),
DDtKey marked this conversation as resolved.
Show resolved Hide resolved
}

impl<T> Clone for OnceFutState<T> {
Expand All @@ -672,7 +672,11 @@ impl<T: 'static> OnceFut<T> {
Fut: Future<Output = Result<T>> + Send + 'static,
{
Self {
state: OnceFutState::Pending(fut.map(Arc::new).boxed().shared()),
state: OnceFutState::Pending(
fut.map(|res| Arc::new(res.map_err(Arc::new)))
.boxed()
.shared(),
),
}
}

Expand All @@ -692,7 +696,7 @@ impl<T: 'static> OnceFut<T> {
OnceFutState::Ready(r) => Poll::Ready(
r.as_ref()
.as_ref()
.map_err(|e| ArrowError::ExternalError(e.to_string().into())),
.map_err(|e| ArrowError::ExternalError(Box::new(e.clone()))),
),
}
}
Expand Down Expand Up @@ -939,6 +943,7 @@ mod tests {
use super::*;
use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use std::pin::Pin;

fn check(left: &[Column], right: &[Column], on: &[(Column, Column)]) -> Result<()> {
let left = left
Expand Down Expand Up @@ -971,6 +976,41 @@ mod tests {
assert!(check(&left, &right, on).is_err());
}

#[tokio::test]
async fn check_error_nesting() {
let once_fut = OnceFut::<()>::new(async {
Err(DataFusionError::ArrowError(ArrowError::CsvError(
"some error".to_string(),
)))
});

struct TestFut(OnceFut<()>);
impl Future for TestFut {
type Output = ArrowResult<()>;

fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
match ready!(self.0.get(cx)) {
Ok(()) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(e)),
}
}
}

let res = TestFut(once_fut).await;
let arrow_err_from_fut = res.expect_err("once_fut always return error");

let wrapped_err = DataFusionError::from(arrow_err_from_fut);
let root_err = wrapped_err.find_root();

assert!(matches!(
root_err,
DataFusionError::ArrowError(ArrowError::CsvError(_))
))
}

#[test]
fn check_not_in_left() {
let left = vec![Column::new("b", 0)];
Expand Down