From fcbe69d8faef57efeff85153c19a2b3f642663dd Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 5 Feb 2024 15:28:54 +0800 Subject: [PATCH] fix more Signed-off-by: Bugen Zhao --- src/connector/src/sink/iceberg/mod.rs | 23 ++++++------- src/connector/src/sink/kafka.rs | 5 +-- src/connector/src/sink/mod.rs | 5 +-- src/connector/src/sink/nats.rs | 8 +++-- src/connector/src/sink/redis.rs | 2 +- src/connector/src/sink/remote.rs | 11 ++++--- src/connector/src/sink/starrocks.rs | 15 ++++++--- src/connector/src/source/base.rs | 8 ++--- .../src/source/cdc/enumerator/mod.rs | 4 +-- src/connector/src/source/cdc/external/mod.rs | 18 +++++------ .../src/source/cdc/external/postgres.rs | 21 +++++------- src/connector/src/source/cdc/source/reader.rs | 3 +- src/connector/src/source/cdc/split.rs | 32 +++++++++---------- .../src/source/datagen/source/reader.rs | 16 +++++----- .../src/source/filesystem/s3/source/reader.rs | 6 +--- .../source/google_pubsub/enumerator/client.rs | 10 +++--- .../src/source/google_pubsub/source/reader.rs | 4 +-- .../src/source/kafka/source/reader.rs | 4 +-- 18 files changed, 98 insertions(+), 97 deletions(-) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index e1859df827974..68c5654533a64 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -40,6 +40,7 @@ use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile}; use icelake::{Table, TableIdentifier}; use itertools::Itertools; use risingwave_common::array::{to_iceberg_record_batch_with_schema, Op, StreamChunk}; +use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; @@ -47,6 +48,7 @@ use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; use serde::de; use serde_derive::Deserialize; +use thiserror_ext::AsReport; use url::Url; use with_options::WithOptions; @@ -803,12 +805,12 @@ impl WriteResult { fn try_from(value: &SinkMetadata, partition_type: &Any) -> Result { if let Some(Serialized(v)) = &value.metadata { let mut values = if let serde_json::Value::Object(v) = - serde_json::from_slice::(&v.metadata).map_err( - |e| -> SinkError { anyhow!("Can't parse iceberg sink metadata: {}", e).into() }, - )? { + serde_json::from_slice::(&v.metadata) + .context("Can't parse iceberg sink metadata")? + { v } else { - return Err(anyhow!("iceberg sink metadata should be a object").into()); + bail!("iceberg sink metadata should be a object"); }; let data_files: Vec; @@ -833,7 +835,7 @@ impl WriteResult { .into_iter() .map(|value| data_file_from_json(value, partition_type.clone())) .collect::, icelake::Error>>() - .map_err(|e| anyhow!("Failed to parse data file from json: {}", e))?; + .context("Failed to parse data file from json")?; } else { return Err(anyhow!("icberg sink metadata should have data_files object").into()); } @@ -858,7 +860,7 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata { .cloned() .map(data_file_to_json) .collect::, icelake::Error>>() - .map_err(|e| anyhow!("Can't serialize data files to json: {}", e))?, + .context("Can't serialize data files to json")?, ); let json_delete_files = serde_json::Value::Array( value @@ -867,7 +869,7 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata { .cloned() .map(data_file_to_json) .collect::, icelake::Error>>() - .map_err(|e| anyhow!("Can't serialize data files to json: {}", e))?, + .context("Can't serialize data files to json")?, ); let json_value = serde_json::Value::Object( vec![ @@ -879,9 +881,8 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata { ); Ok(SinkMetadata { metadata: Some(Serialized(SerializedMetadata { - metadata: serde_json::to_vec(&json_value).map_err(|e| -> SinkError { - anyhow!("Can't serialized iceberg sink metadata: {}", e).into() - })?, + metadata: serde_json::to_vec(&json_value) + .context("Can't serialize iceberg sink metadata")?, })), }) } @@ -916,7 +917,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { txn.append_delete_file(s.delete_files); }); txn.commit().await.map_err(|err| { - tracing::error!(?err, "Failed to commit iceberg table"); + tracing::error!(error = %err.as_report(), "Failed to commit iceberg table"); SinkError::Iceberg(anyhow!(err)) })?; diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 25681125f9069..6d5407578b29d 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -29,6 +29,7 @@ use risingwave_common::catalog::Schema; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use strum_macros::{Display, EnumString}; +use thiserror_ext::AsReport; use with_options::WithOptions; use super::catalog::{SinkFormat, SinkFormatDesc}; @@ -478,10 +479,10 @@ impl<'w> KafkaPayloadWriter<'w> { // We can retry for another round after sleeping for sometime Err((e, rec)) => { tracing::warn!( - "producing message (key {:?}) to topic {} failed, err {:?}.", + error = %e.as_report(), + "producing message (key {:?}) to topic {} failed", rec.key.map(|k| k.to_bytes()), rec.topic, - e ); record = rec; match e { diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 26e946fa06d5e..fc6712f17604b 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -56,6 +56,7 @@ use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema}; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::MetaClient; use thiserror::Error; +use thiserror_ext::AsReport; pub use tracing; use self::catalog::{SinkFormatDesc, SinkType}; @@ -547,7 +548,7 @@ impl From for SinkError { impl From for SinkError { fn from(value: ClickHouseError) -> Self { - SinkError::ClickHouse(format!("{}", value)) + SinkError::ClickHouse(value.to_report_string()) } } @@ -559,6 +560,6 @@ impl From for SinkError { impl From for SinkError { fn from(value: RedisError) -> Self { - SinkError::Redis(format!("{}", value)) + SinkError::Redis(value.to_report_string()) } } diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 9f906b49fbd21..7a97771dee8ef 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -14,7 +14,7 @@ use core::fmt::Debug; use std::collections::HashMap; -use anyhow::anyhow; +use anyhow::{anyhow, Context as _}; use async_nats::jetstream::context::Context; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; @@ -159,13 +159,15 @@ impl NatsSinkWriter { self.context .publish(self.config.common.subject.clone(), item.into()) .await - .map_err(|e| SinkError::Nats(anyhow!("nats sink error: {:?}", e)))?; + .context("nats sink error") + .map_err(SinkError::Nats)?; } Ok::<_, SinkError>(()) }, ) .await - .map_err(|e| SinkError::Nats(anyhow!("nats sink error: {:?}", e))) + .context("nats sink error") + .map_err(SinkError::Nats) } } diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 344201981fd5c..d79d67e4adc2e 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -63,7 +63,7 @@ impl RedisConfig { pub fn from_hashmap(properties: HashMap) -> Result { let config = serde_json::from_value::(serde_json::to_value(properties).unwrap()) - .map_err(|e| SinkError::Config(anyhow!("{:?}", e)))?; + .map_err(|e| SinkError::Config(anyhow!(e)))?; Ok(config) } } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 943dd46a565b2..dfc3bed0e372c 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -48,6 +48,7 @@ use risingwave_rpc_client::{ DEFAULT_BUFFER_SIZE, }; use rw_futures_util::drop_either_future; +use thiserror_ext::AsReport; use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender}; use tokio::task::spawn_blocking; @@ -766,11 +767,13 @@ impl EmbeddedConnectorClient { let jvm = self.jvm; std::thread::spawn(move || { - let mut env = match jvm.attach_current_thread() { + let mut env = match jvm + .attach_current_thread() + .context("failed to attach current thread") + { Ok(env) => env, Err(e) => { - let _ = response_tx - .blocking_send(Err(anyhow!("failed to attach current thread: {:?}", e))); + let _ = response_tx.blocking_send(Err(e)); return; } }; @@ -789,7 +792,7 @@ impl EmbeddedConnectorClient { tracing::info!("end of jni call {}::{}", class_name, method_name); } Err(e) => { - tracing::error!("jni call error: {:?}", e); + tracing::error!(error = %e.as_report(), "jni call error"); } }; }); diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index d1c1f97f6a60c..4c9460abc431d 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -29,6 +29,7 @@ use serde::Deserialize; use serde_derive::Serialize; use serde_json::Value; use serde_with::serde_as; +use thiserror_ext::AsReport; use with_options::WithOptions; use super::doris_starrocks_connector::{ @@ -322,13 +323,17 @@ impl StarrocksSinkWriter { .first() .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? .parse::() - .map_err(|e| SinkError::Starrocks(format!("starrocks sink error {}", e)))?; + .map_err(|e| { + SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report())) + })?; let scale = decimal_all .last() .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? .parse::() - .map_err(|e| SinkError::Starrocks(format!("starrocks sink error {}", e)))?; + .map_err(|e| { + SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report())) + })?; decimal_map.insert(name.to_string(), (length, scale)); } } @@ -394,7 +399,7 @@ impl StarrocksSinkWriter { Value::String("0".to_string()), ); let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { - SinkError::Starrocks(format!("Json derialize error {:?}", e)) + SinkError::Starrocks(format!("Json derialize error: {}", e.as_report())) })?; self.client .as_mut() @@ -411,7 +416,7 @@ impl StarrocksSinkWriter { Value::String("1".to_string()), ); let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { - SinkError::Starrocks(format!("Json derialize error {:?}", e)) + SinkError::Starrocks(format!("Json derialize error: {}", e.as_report())) })?; self.client .as_mut() @@ -429,7 +434,7 @@ impl StarrocksSinkWriter { Value::String("0".to_string()), ); let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { - SinkError::Starrocks(format!("Json derialize error {:?}", e)) + SinkError::Starrocks(format!("Json derialize error: {}", e.as_report())) })?; self.client .as_mut() diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index b0e556c52b3be..5b909a2738f3c 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -85,8 +85,8 @@ pub trait UnknownFields { impl TryFromHashmap for P { fn try_from_hashmap(props: HashMap, deny_unknown_fields: bool) -> Result { - let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?; - let res = serde_json::from_value::

(json_value).map_err(|e| anyhow!(e.to_string()))?; + let json_value = serde_json::to_value(props)?; + let res = serde_json::from_value::

(json_value)?; if !deny_unknown_fields || res.unknown_fields().is_empty() { Ok(res) @@ -310,8 +310,8 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result }; return Ok(SourceStruct::new(format, encode)); } - let source_format = info.get_format().map_err(|e| anyhow!("{e:?}"))?; - let source_encode = info.get_row_encode().map_err(|e| anyhow!("{e:?}"))?; + let source_format = info.get_format()?; + let source_encode = info.get_row_encode()?; let (format, encode) = match (source_format, source_encode) { (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json), (PbFormatType::Plain, PbEncodeType::Protobuf) => { diff --git a/src/connector/src/source/cdc/enumerator/mod.rs b/src/connector/src/source/cdc/enumerator/mod.rs index 1664640eef03f..58bc42e537578 100644 --- a/src/connector/src/source/cdc/enumerator/mod.rs +++ b/src/connector/src/source/cdc/enumerator/mod.rs @@ -16,7 +16,7 @@ use std::marker::PhantomData; use std::ops::Deref; use std::str::FromStr; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use async_trait::async_trait; use itertools::Itertools; use prost::Message; @@ -111,7 +111,7 @@ where ) }) .await - .map_err(|e| anyhow!("failed to validate source: {:?}", e))??; + .context("failed to validate source")??; tracing::debug!("validate cdc source properties success"); Ok(Self { diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 1d0c0e3974404..78c6c714e2bc6 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -17,7 +17,7 @@ mod postgres; use std::collections::HashMap; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use futures::stream::BoxStream; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; @@ -191,19 +191,18 @@ pub struct DebeziumSourceOffset { impl MySqlOffset { pub fn parse_debezium_offset(offset: &str) -> ConnectorResult { - let dbz_offset: DebeziumOffset = serde_json::from_str(offset).map_err(|e| { - ConnectorError::Internal(anyhow!("invalid upstream offset: {}, error: {}", offset, e)) - })?; + let dbz_offset: DebeziumOffset = serde_json::from_str(offset) + .with_context(|| format!("invalid upstream offset: {}", offset))?; Ok(Self { filename: dbz_offset .source_offset .file - .ok_or_else(|| anyhow!("binlog file not found in offset"))?, + .context("binlog file not found in offset")?, position: dbz_offset .source_offset .pos - .ok_or_else(|| anyhow!("binlog position not found in offset"))?, + .context("binlog position not found in offset")?, }) } } @@ -268,7 +267,8 @@ impl ExternalTableReader for MySqlExternalTableReader { let row = rs .iter_mut() .exactly_one() - .map_err(|e| ConnectorError::Internal(anyhow!("read binlog error: {}", e)))?; + .ok() + .context("expect exactly one row when reading binlog offset")?; Ok(CdcOffset::MySql(MySqlOffset { filename: row.take("File").unwrap(), @@ -296,9 +296,7 @@ impl MySqlExternalTableReader { let config = serde_json::from_value::( serde_json::to_value(with_properties).unwrap(), ) - .map_err(|e| { - ConnectorError::Config(anyhow!("fail to extract mysql connector properties: {}", e)) - })?; + .context("failed to extract mysql connector properties")?; let database_url = format!( "mysql://{}:{}@{}:{}/{}", diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 036e62abfe129..f8f0c9d402347 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -15,7 +15,7 @@ use std::cmp::Ordering; use std::collections::HashMap; -use anyhow::anyhow; +use anyhow::Context; use futures::stream::BoxStream; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; @@ -24,6 +24,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::DatumRef; use serde_derive::{Deserialize, Serialize}; +use thiserror_ext::AsReport; use tokio_postgres::types::PgLsn; use tokio_postgres::NoTls; @@ -51,19 +52,18 @@ impl PartialOrd for PostgresOffset { impl PostgresOffset { pub fn parse_debezium_offset(offset: &str) -> ConnectorResult { - let dbz_offset: DebeziumOffset = serde_json::from_str(offset).map_err(|e| { - ConnectorError::Internal(anyhow!("invalid upstream offset: {}, error: {}", offset, e)) - })?; + let dbz_offset: DebeziumOffset = serde_json::from_str(offset) + .with_context(|| format!("invalid upstream offset: {}", offset))?; Ok(Self { txid: dbz_offset .source_offset .txid - .ok_or_else(|| anyhow!("invalid postgres txid"))?, + .context("invalid postgres txid")?, lsn: dbz_offset .source_offset .lsn - .ok_or_else(|| anyhow!("invalid postgres lsn"))?, + .context("invalid postgres lsn")?, }) } } @@ -125,12 +125,7 @@ impl PostgresExternalTableReader { let config = serde_json::from_value::( serde_json::to_value(properties).unwrap(), ) - .map_err(|e| { - ConnectorError::Config(anyhow!( - "fail to extract postgres connector properties: {}", - e - )) - })?; + .context("failed to extract postgres connector properties")?; let database_url = format!( "postgresql://{}:{}@{}:{}/{}", @@ -141,7 +136,7 @@ impl PostgresExternalTableReader { tokio::spawn(async move { if let Err(e) = connection.await { - tracing::error!("connection error: {}", e); + tracing::error!(error = %e.as_report(), "postgres connection error"); } }); diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 19f7ca55cd302..c21d579df7778 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -26,6 +26,7 @@ use risingwave_jni_core::{call_static_method, JniReceiverType, JniSenderType}; use risingwave_pb::connector_service::{ GetEventStreamRequest, GetEventStreamResponse, SourceCommonParam, }; +use thiserror_ext::AsReport; use tokio::sync::mpsc; use crate::parser::ParserConfig; @@ -137,7 +138,7 @@ impl SplitReader for CdcSplitReader { tracing::info!(?source_id, "end of jni call runJniDbzSourceThread"); } Err(e) => { - tracing::error!(?source_id, "jni call error: {:?}", e); + tracing::error!(?source_id, error = %e.as_report(), "jni call error"); } } }); diff --git a/src/connector/src/source/cdc/split.rs b/src/connector/src/source/cdc/split.rs index 4c46b27be75e8..a7357d231b78b 100644 --- a/src/connector/src/source/cdc/split.rs +++ b/src/connector/src/source/cdc/split.rs @@ -14,7 +14,7 @@ use std::marker::PhantomData; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; @@ -66,14 +66,13 @@ impl MySqlCdcSplit { pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { let mut snapshot_done = self.inner.snapshot_done; if !snapshot_done { - let dbz_offset: DebeziumOffset = serde_json::from_str(&start_offset).map_err(|e| { - anyhow!( - "invalid mysql offset: {}, error: {}, split: {}", - start_offset, - e, - self.inner.split_id - ) - })?; + let dbz_offset: DebeziumOffset = + serde_json::from_str(&start_offset).with_context(|| { + format!( + "invalid mysql offset: {}, split: {}", + start_offset, self.inner.split_id + ) + })?; // heartbeat event should not update the `snapshot_done` flag if !dbz_offset.is_heartbeat { @@ -106,14 +105,13 @@ impl PostgresCdcSplit { pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { let mut snapshot_done = self.inner.snapshot_done; if !snapshot_done { - let dbz_offset: DebeziumOffset = serde_json::from_str(&start_offset).map_err(|e| { - anyhow!( - "invalid postgres offset: {}, error: {}, split: {}", - start_offset, - e, - self.inner.split_id - ) - })?; + let dbz_offset: DebeziumOffset = + serde_json::from_str(&start_offset).with_context(|| { + format!( + "invalid postgres offset: {}, split: {}", + start_offset, self.inner.split_id + ) + })?; // heartbeat event should not update the `snapshot_done` flag if !dbz_offset.is_heartbeat { diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 2bef27ef95fb0..2e1b5f7917261 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -14,10 +14,11 @@ use std::collections::HashMap; -use anyhow::{anyhow, Result}; +use anyhow::{Context, Result}; use async_trait::async_trait; use futures::{Stream, StreamExt, TryStreamExt}; use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty}; +use thiserror_ext::AsReport; use super::generator::DatagenEventGenerator; use crate::parser::{EncodingProperties, ParserConfig, ProtocolProperties}; @@ -209,9 +210,9 @@ fn generator_from_data_type( Ok(seed) => seed ^ split_index, Err(e) => { tracing::warn!( - "cannot parse {:?} to u64 due to {:?}, will use {:?} as random seed", + error = %e.as_report(), + "cannot parse {:?} to u64, will use {:?} as random seed", seed, - e, split_index ); split_index @@ -230,11 +231,10 @@ fn generator_from_data_type( .map(|s| s.to_lowercase()); let basetime = match fields_option_map.get(format!("fields.{}.basetime", name).as_str()) { - Some(base) => { - Some(chrono::DateTime::parse_from_rfc3339(base).map_err(|e| { - anyhow!("cannot parse {:?} to rfc3339 due to {:?}", base, e) - })?) - } + Some(base) => Some( + chrono::DateTime::parse_from_rfc3339(base) + .with_context(|| format!("cannot parse `{base}` to rfc3339"))?, + ), None => None, }; diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index b8e7a2a71b0cd..884f1d19062ce 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -85,11 +85,7 @@ impl S3FileReader { return Ok(()); } Err(e) => { - return Err(anyhow!( - "S3 GetObject from {} error: {}", - bucket_name, - e.to_string() - )); + return Err(anyhow!(e).context(format!("S3 GetObject from {bucket_name} error"))); } }; diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index 01809a3c773b0..bc1d9d078b66a 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::{anyhow, bail}; +use anyhow::{bail, Context}; use async_trait::async_trait; use chrono::{TimeZone, Utc}; use google_cloud_pubsub::client::{Client, ClientConfig}; @@ -49,13 +49,13 @@ impl SplitEnumerator for PubsubSplitEnumerator { let config = ClientConfig::default().with_auth().await?; let client = Client::new(config) .await - .map_err(|e| anyhow!("error initializing pubsub client: {:?}", e))?; + .context("error initializing pubsub client")?; let sub = client.subscription(&subscription); if !sub .exists(None) .await - .map_err(|e| anyhow!("error checking subscription validity: {:?}", e))? + .context("error checking subscription validity")? { bail!("subscription {} does not exist", &subscription) } @@ -76,7 +76,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { (Some(start_offset), None) => { let ts = start_offset .parse::() - .map_err(|e| anyhow!("error parsing start_offset: {:?}", e)) + .context("error parsing start_offset") .map(|nanos| Utc.timestamp_nanos(nanos).into())?; Some(SeekTo::Timestamp(ts)) } @@ -89,7 +89,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { if let Some(seek_to) = seek_to { sub.seek(seek_to, None) .await - .map_err(|e| anyhow!("error seeking subscription: {:?}", e))?; + .context("error seeking subscription")?; } Ok(Self { diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index d18fcb0be258b..fd5fab15ed10b 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -135,12 +135,12 @@ impl SplitReader for PubsubSplitReader { .as_str() .parse::() .map(|nanos| Utc.timestamp_nanos(nanos)) - .map_err(|e| anyhow!("error parsing offset: {:?}", e))?; + .context("error parsing offset")?; subscription .seek(SeekTo::Timestamp(timestamp.into()), None) .await - .map_err(|e| anyhow!("error seeking to pubsub offset: {:?}", e))?; + .context("error seeking to pubsub offset")?; } let stop_offset = if let Some(ref offset) = split.stop_offset { diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 691590e361cde..bb8e70471282f 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::mem::swap; use std::time::Duration; -use anyhow::{anyhow, Result}; +use anyhow::{Context, Result}; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; @@ -98,7 +98,7 @@ impl SplitReader for KafkaSplitReader { .set_log_level(RDKafkaLogLevel::Info) .create_with_context(client_ctx) .await - .map_err(|e| anyhow!("failed to create kafka consumer: {}", e))?; + .context("failed to create kafka consumer")?; let mut tpl = TopicPartitionList::with_capacity(splits.len());