diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index ae26fca228321..e3e53ee449fac 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -19,7 +19,7 @@ use std::sync::Arc; pub use anyhow::anyhow; use parquet::errors::ParquetError; use risingwave_common::array::ArrayError; -use risingwave_common::error::BoxedError; +use risingwave_common::error::{def_anyhow_newtype, def_anyhow_variant, BoxedError}; use risingwave_common::util::value_encoding::error::ValueEncodingError; use risingwave_connector::error::ConnectorError; use risingwave_dml::error::DmlError; @@ -114,25 +114,11 @@ pub enum BatchError { DmlError, ), - #[error(transparent)] - Iceberg( - #[from] - #[backtrace] - iceberg::Error, - ), - - #[error(transparent)] - Parquet( - #[from] - #[backtrace] - ParquetError, - ), - - #[error(transparent)] - Postgres( + #[error("External system error: {0}")] + ExternalSystemError( #[from] #[backtrace] - tokio_postgres::Error, + BatchExternalSystemError, ), // Make the ref-counted type to be a variant for easier code structuring. @@ -200,3 +186,13 @@ impl From for BatchError { Self::Connector(value.into()) } } + +// Define a external system error +def_anyhow_variant! { + pub BatchExternalSystemError, + BatchError ExternalSystemError, + + tokio_postgres::Error => "Postgres error", + iceberg::Error => "Iceberg error", + ParquetError => "Parquet error", +} diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index f1da314c1003e..229cff77e9b9c 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -151,13 +151,10 @@ impl IcebergScanExecutor { .build(); let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task)); - let mut record_batch_stream = reader - .read(Box::pin(file_scan_stream)) - .map_err(BatchError::Iceberg)? - .enumerate(); + let mut record_batch_stream = reader.read(Box::pin(file_scan_stream))?.enumerate(); while let Some((index, record_batch)) = record_batch_stream.next().await { - let record_batch = record_batch.map_err(BatchError::Iceberg)?; + let record_batch = record_batch?; let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; // position delete @@ -195,8 +192,7 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { source_node.with_properties.clone(), source_node.secret_refs.clone(), ); - let config = ConnectorProperties::extract(options_with_secret.clone(), false) - .map_err(BatchError::connector)?; + let config = ConnectorProperties::extract(options_with_secret.clone(), false)?; let split_list = source_node .split @@ -279,12 +275,10 @@ impl PositionDeleteFilter { let reader = table.reader_builder().with_batch_size(batch_size).build(); - let mut record_batch_stream = reader - .read(Box::pin(position_delete_file_scan_stream)) - .map_err(BatchError::Iceberg)?; + let mut record_batch_stream = reader.read(Box::pin(position_delete_file_scan_stream))?; while let Some(record_batch) = record_batch_stream.next().await { - let record_batch = record_batch.map_err(BatchError::Iceberg)?; + let record_batch = record_batch?; let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; for row in chunk.rows() { // The schema is fixed. `0` must be `file_path`, `1` must be `pos`. @@ -382,12 +376,10 @@ impl EqualityDeleteFilter { let reader = table.reader_builder().with_batch_size(batch_size).build(); let delete_file_scan_stream = tokio_stream::once(Ok(equality_delete_file_scan_task)); - let mut delete_record_batch_stream = reader - .read(Box::pin(delete_file_scan_stream)) - .map_err(BatchError::Iceberg)?; + let mut delete_record_batch_stream = reader.read(Box::pin(delete_file_scan_stream))?; while let Some(record_batch) = delete_record_batch_stream.next().await { - let record_batch = record_batch.map_err(BatchError::Iceberg)?; + let record_batch = record_batch?; let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; for row in chunk.rows() { diff --git a/src/batch/src/executor/s3_file_scan.rs b/src/batch/src/executor/s3_file_scan.rs index 5096d1d625fa1..a7b0d1bacd791 100644 --- a/src/batch/src/executor/s3_file_scan.rs +++ b/src/batch/src/executor/s3_file_scan.rs @@ -109,7 +109,7 @@ impl S3FileScanExecutor { #[for_await] for record_batch in record_batch_stream { - let record_batch = record_batch.map_err(BatchError::Parquet)?; + let record_batch = record_batch?; let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; debug_assert_eq!(chunk.data_types(), self.schema.data_types()); yield chunk; diff --git a/src/error/src/anyhow.rs b/src/error/src/anyhow.rs index 3d5d6d927fb58..08203c176fcbc 100644 --- a/src/error/src/anyhow.rs +++ b/src/error/src/anyhow.rs @@ -154,4 +154,30 @@ macro_rules! def_anyhow_newtype { )* }; } -pub use def_anyhow_newtype; + +/// Define a newtype + it's variant in the specified type. +/// This is useful when you want to define a new error type, +/// but also want to define a variant for it in another enum. +#[macro_export] +macro_rules! def_anyhow_variant { + ( + $(#[$attr:meta])* $vis:vis $name:ident, + $enum_name:ident $variant_name:ident + $(, $from:ty => $context:tt)* $(,)? + ) => { + def_anyhow_newtype! { + $(#[$attr])* $vis $name + $(, $from => $context)* + } + + $( + impl From<$from> for $enum_name { + fn from(error: $from) -> Self { + $enum_name::$variant_name($name::from(error)) + } + } + )* + } +} + +pub use {def_anyhow_newtype, def_anyhow_variant};