Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connector): migrate anyhow::Error to ConnectorError newtype #15042

Merged
merged 11 commits into from
Feb 23, 2024
3 changes: 2 additions & 1 deletion e2e_test/source/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ db error: ERROR: Failed to run the query
Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to create source worker
3: missing field `properties.bootstrap.server`
3: failed to parse json
4: missing field `properties.bootstrap.server`


statement error
Expand Down
7 changes: 7 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use anyhow::anyhow;
use risingwave_common::array::ArrayError;
use risingwave_common::error::BoxedError;
use risingwave_common::util::value_encoding::error::ValueEncodingError;
use risingwave_connector::error::ConnectorError;
use risingwave_dml::error::DmlError;
use risingwave_expr::ExprError;
use risingwave_pb::PbFieldNotFound;
Expand Down Expand Up @@ -156,3 +157,9 @@ impl From<BatchError> for Status {
Self::from(&err)
}
}

impl From<ConnectorError> for BatchError {
fn from(value: ConnectorError) -> Self {
Self::Connector(value.into())
}
}
6 changes: 1 addition & 5 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,7 @@ impl IcebergScanExecutor {

#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let table = self
.iceberg_config
.load_table()
.await
.map_err(BatchError::Internal)?;
let table = self.iceberg_config.load_table().await?;

let table_scan: TableScan = table
.new_scan_builder()
Expand Down
1 change: 1 addition & 0 deletions src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod varchar;

use std::time::Duration;

// TODO(error-handling): use a new error type
use anyhow::{anyhow, Result};
use chrono::{DateTime, FixedOffset};
pub use numeric::*;
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/aws_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use aws_sdk_s3::{client as s3_client, config as s3_config};
use url::Url;

use crate::common::AwsAuthProps;
use crate::error::ConnectorResult;

const AWS_CUSTOM_CONFIG_KEY: [&str; 3] = ["retry_times", "conn_timeout", "read_timeout"];

Expand Down Expand Up @@ -106,7 +107,7 @@ pub fn s3_client(
pub async fn load_file_descriptor_from_s3(
location: &Url,
config: &AwsAuthProps,
) -> anyhow::Result<Vec<u8>> {
) -> ConnectorResult<Vec<u8>> {
let bucket = location
.domain()
.with_context(|| format!("illegal file path {}", location))?;
Expand Down
53 changes: 31 additions & 22 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::io::Write;
use std::time::Duration;

use anyhow::{anyhow, Context, Ok};
use anyhow::{anyhow, Context};
use async_nats::jetstream::consumer::DeliverPolicy;
use async_nats::jetstream::{self};
use aws_sdk_kinesis::Client as KinesisClient;
Expand All @@ -35,6 +35,7 @@ use with_options::WithOptions;

use crate::aws_utils::load_file_descriptor_from_s3;
use crate::deserialize_duration_from_string;
use crate::error::ConnectorResult;
use crate::sink::SinkError;
use crate::source::nats::source::NatsOffset;
// The file describes the common abstractions for each connector and can be used in both source and
Expand Down Expand Up @@ -72,7 +73,7 @@ pub struct AwsAuthProps {
}

impl AwsAuthProps {
async fn build_region(&self) -> anyhow::Result<Region> {
async fn build_region(&self) -> ConnectorResult<Region> {
if let Some(region_name) = &self.region {
Ok(Region::new(region_name.clone()))
} else {
Expand All @@ -85,11 +86,11 @@ impl AwsAuthProps {
.build()
.region()
.await
.ok_or_else(|| anyhow::format_err!("region should be provided"))?)
.context("region should be provided")?)
}
}

fn build_credential_provider(&self) -> anyhow::Result<SharedCredentialsProvider> {
fn build_credential_provider(&self) -> ConnectorResult<SharedCredentialsProvider> {
if self.access_key.is_some() && self.secret_key.is_some() {
Ok(SharedCredentialsProvider::new(
aws_credential_types::Credentials::from_keys(
Expand All @@ -99,16 +100,14 @@ impl AwsAuthProps {
),
))
} else {
Err(anyhow!(
"Both \"access_key\" and \"secret_access\" are required."
))
bail!("Both \"access_key\" and \"secret_access\" are required.")
}
}

async fn with_role_provider(
&self,
credential: SharedCredentialsProvider,
) -> anyhow::Result<SharedCredentialsProvider> {
) -> ConnectorResult<SharedCredentialsProvider> {
if let Some(role_name) = &self.arn {
let region = self.build_region().await?;
let mut role = AssumeRoleProvider::builder(role_name)
Expand All @@ -124,7 +123,7 @@ impl AwsAuthProps {
}
}

pub async fn build_config(&self) -> anyhow::Result<SdkConfig> {
pub async fn build_config(&self) -> ConnectorResult<SdkConfig> {
let region = self.build_region().await?;
let credentials_provider = self
.with_role_provider(self.build_credential_provider()?)
Expand Down Expand Up @@ -386,23 +385,30 @@ pub struct PulsarOauthCommon {
pub scope: Option<String>,
}

fn create_credential_temp_file(credentials: &[u8]) -> std::io::Result<NamedTempFile> {
let mut f = NamedTempFile::new()?;
f.write_all(credentials)?;
f.as_file().sync_all()?;
Ok(f)
}

impl PulsarCommon {
pub(crate) async fn build_client(
&self,
oauth: &Option<PulsarOauthCommon>,
aws_auth_props: &AwsAuthProps,
) -> anyhow::Result<Pulsar<TokioExecutor>> {
) -> ConnectorResult<Pulsar<TokioExecutor>> {
let mut pulsar_builder = Pulsar::builder(&self.service_url, TokioExecutor);
let mut temp_file = None;
if let Some(oauth) = oauth.as_ref() {
let url = Url::parse(&oauth.credentials_url)?;
match url.scheme() {
"s3" => {
let credentials = load_file_descriptor_from_s3(&url, aws_auth_props).await?;
let mut f = NamedTempFile::new()?;
f.write_all(&credentials)?;
f.as_file().sync_all()?;
temp_file = Some(f);
temp_file = Some(
create_credential_temp_file(&credentials)
.context("failed to create temp file for pulsar credentials")?,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄

);
}
"file" => {}
_ => {
Expand Down Expand Up @@ -477,7 +483,7 @@ pub struct KinesisCommon {
}

impl KinesisCommon {
pub(crate) async fn build_client(&self) -> anyhow::Result<KinesisClient> {
pub(crate) async fn build_client(&self) -> ConnectorResult<KinesisClient> {
let config = AwsAuthProps {
region: Some(self.stream_region.clone()),
endpoint: self.endpoint.clone(),
Expand Down Expand Up @@ -539,7 +545,7 @@ pub struct NatsCommon {
}

impl NatsCommon {
pub(crate) async fn build_client(&self) -> anyhow::Result<async_nats::Client> {
pub(crate) async fn build_client(&self) -> ConnectorResult<async_nats::Client> {
let mut connect_options = async_nats::ConnectOptions::new();
match self.connect_mode.as_str() {
"user_and_password" => {
Expand Down Expand Up @@ -582,7 +588,7 @@ impl NatsCommon {
Ok(client)
}

pub(crate) async fn build_context(&self) -> anyhow::Result<jetstream::Context> {
pub(crate) async fn build_context(&self) -> ConnectorResult<jetstream::Context> {
let client = self.build_client().await?;
let jetstream = async_nats::jetstream::new(client);
Ok(jetstream)
Expand All @@ -593,7 +599,7 @@ impl NatsCommon {
stream: String,
split_id: String,
start_sequence: NatsOffset,
) -> anyhow::Result<
) -> ConnectorResult<
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
> {
let context = self.build_context().await?;
Expand All @@ -612,13 +618,16 @@ impl NatsCommon {
NatsOffset::Earliest => DeliverPolicy::All,
NatsOffset::Latest => DeliverPolicy::Last,
NatsOffset::SequenceNumber(v) => {
let parsed = v.parse::<u64>()?;
let parsed = v
.parse::<u64>()
.context("failed to parse nats offset as sequence number")?;
DeliverPolicy::ByStartSequence {
start_sequence: 1 + parsed,
}
}
NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime {
start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)?,
start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)
.context("invalid timestamp for nats offset")?,
},
NatsOffset::None => DeliverPolicy::All,
};
Expand All @@ -635,7 +644,7 @@ impl NatsCommon {
&self,
jetstream: jetstream::Context,
stream: String,
) -> anyhow::Result<jetstream::stream::Stream> {
) -> ConnectorResult<jetstream::stream::Stream> {
let subjects: Vec<String> = self.subject.split(',').map(|s| s.to_string()).collect();
let mut config = jetstream::stream::Config {
name: stream,
Expand All @@ -662,7 +671,7 @@ impl NatsCommon {
Ok(stream)
}

pub(crate) fn create_credential(&self, seed: &str, jwt: &str) -> anyhow::Result<String> {
pub(crate) fn create_credential(&self, seed: &str, jwt: &str) -> ConnectorResult<String> {
let creds = format!(
"-----BEGIN NATS USER JWT-----\n{}\n------END NATS USER JWT------\n\n\
************************* IMPORTANT *************************\n\
Expand Down
46 changes: 45 additions & 1 deletion src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,57 @@
// limitations under the License.

use risingwave_common::error::v2::def_anyhow_newtype;
use risingwave_pb::PbFieldNotFound;
use risingwave_rpc_client::error::RpcError;

use crate::parser::AccessError;
use crate::schema::schema_registry::{ConcurrentRequestError, WireFormatError};
use crate::schema::InvalidOptionError;
use crate::sink::SinkError;

def_anyhow_newtype! {
pub ConnectorError,

// Common errors
std::io::Error => transparent,

// Fine-grained connector errors
AccessError => transparent,
WireFormatError => transparent,
ConcurrentRequestError => transparent,
InvalidOptionError => transparent,
SinkError => transparent,
PbFieldNotFound => transparent,

// TODO(error-handling): Remove implicit contexts below and specify ad-hoc context for each conversion.

// Parsing errors
url::ParseError => "failed to parse url",
serde_json::Error => "failed to parse json",
csv::Error => "failed to parse csv",

// Connector errors
opendal::Error => transparent, // believed to be self-explanatory

mysql_async::Error => "MySQL error",
tokio_postgres::Error => "Postgres error",
apache_avro::Error => "Avro error",
rdkafka::error::KafkaError => "Kafka error",
pulsar::Error => "Pulsar error",
async_nats::jetstream::consumer::StreamError => "Nats error",
async_nats::jetstream::consumer::pull::MessagesError => "Nats error",
async_nats::jetstream::context::CreateStreamError => "Nats error",
async_nats::jetstream::stream::ConsumerError => "Nats error",
icelake::Error => "Iceberg error",
redis::RedisError => "Redis error",
arrow_schema::ArrowError => "Arrow error",
google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error",
}

pub type ConnectorResult<T> = Result<T, ConnectorError>;
pub type ConnectorResult<T, E = ConnectorError> = std::result::Result<T, E>;

impl From<ConnectorError> for RpcError {
fn from(value: ConnectorError) -> Self {
RpcError::Internal(value.0)
}
}
4 changes: 2 additions & 2 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ macro_rules! impl_split {

$(
impl TryFrom<SplitImpl> for $split {
type Error = anyhow::Error;
type Error = $crate::error::ConnectorError;

fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
match split {
SplitImpl::$variant_name(inner) => Ok(inner),
other => Err(anyhow::anyhow!("expect {} but get {:?}", stringify!($split), other))
other => risingwave_common::bail!("expect {} but get {:?}", stringify!($split), other),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_pb::plan_common::{
AdditionalColumnTimestamp,
};

use crate::error::ConnectorResult;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR,
S3_CONNECTOR,
Expand Down Expand Up @@ -86,7 +87,7 @@ pub fn build_additional_column_catalog(
inner_field_name: Option<&str>,
data_type: Option<&str>,
reject_unknown_connector: bool,
) -> anyhow::Result<ColumnCatalog> {
) -> ConnectorResult<ColumnCatalog> {
let compatible_columns = match (
COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name),
reject_unknown_connector,
Expand Down
Loading
Loading