Skip to content

Commit

Permalink
fix more
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Feb 5, 2024
1 parent 417f040 commit 5965695
Show file tree
Hide file tree
Showing 18 changed files with 98 additions and 97 deletions.
23 changes: 12 additions & 11 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile};
use icelake::{Table, TableIdentifier};
use itertools::Itertools;
use risingwave_common::array::{to_iceberg_record_batch_with_schema, Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
use risingwave_pb::connector_service::SinkMetadata;
use serde::de;
use serde_derive::Deserialize;
use thiserror_ext::AsReport;
use url::Url;
use with_options::WithOptions;

Expand Down Expand Up @@ -804,12 +806,12 @@ impl WriteResult {
fn try_from(value: &SinkMetadata, partition_type: &Any) -> Result<Self> {
if let Some(Serialized(v)) = &value.metadata {
let mut values = if let serde_json::Value::Object(v) =
serde_json::from_slice::<serde_json::Value>(&v.metadata).map_err(
|e| -> SinkError { anyhow!("Can't parse iceberg sink metadata: {}", e).into() },
)? {
serde_json::from_slice::<serde_json::Value>(&v.metadata)
.context("Can't parse iceberg sink metadata")?
{
v
} else {
return Err(anyhow!("iceberg sink metadata should be a object").into());
bail!("iceberg sink metadata should be a object");
};

let data_files: Vec<DataFile>;
Expand All @@ -834,7 +836,7 @@ impl WriteResult {
.into_iter()
.map(|value| data_file_from_json(value, partition_type.clone()))
.collect::<std::result::Result<Vec<DataFile>, icelake::Error>>()
.map_err(|e| anyhow!("Failed to parse data file from json: {}", e))?;
.context("Failed to parse data file from json")?;
} else {
return Err(anyhow!("icberg sink metadata should have data_files object").into());
}
Expand All @@ -859,7 +861,7 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata {
.cloned()
.map(data_file_to_json)
.collect::<std::result::Result<Vec<serde_json::Value>, icelake::Error>>()
.map_err(|e| anyhow!("Can't serialize data files to json: {}", e))?,
.context("Can't serialize data files to json")?,
);
let json_delete_files = serde_json::Value::Array(
value
Expand All @@ -868,7 +870,7 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata {
.cloned()
.map(data_file_to_json)
.collect::<std::result::Result<Vec<serde_json::Value>, icelake::Error>>()
.map_err(|e| anyhow!("Can't serialize data files to json: {}", e))?,
.context("Can't serialize data files to json")?,
);
let json_value = serde_json::Value::Object(
vec![
Expand All @@ -880,9 +882,8 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata {
);
Ok(SinkMetadata {
metadata: Some(Serialized(SerializedMetadata {
metadata: serde_json::to_vec(&json_value).map_err(|e| -> SinkError {
anyhow!("Can't serialized iceberg sink metadata: {}", e).into()
})?,
metadata: serde_json::to_vec(&json_value)
.context("Can't serialize iceberg sink metadata")?,
})),
})
}
Expand Down Expand Up @@ -917,7 +918,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
txn.append_delete_file(s.delete_files);
});
txn.commit().await.map_err(|err| {
tracing::error!(?err, "Failed to commit iceberg table");
tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
SinkError::Iceberg(anyhow!(err))
})?;

Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_common::catalog::Schema;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use strum_macros::{Display, EnumString};
use thiserror_ext::AsReport;
use with_options::WithOptions;

use super::catalog::{SinkFormat, SinkFormatDesc};
Expand Down Expand Up @@ -474,10 +475,10 @@ impl<'w> KafkaPayloadWriter<'w> {
// We can retry for another round after sleeping for sometime
Err((e, rec)) => {
tracing::warn!(
"producing message (key {:?}) to topic {} failed, err {:?}.",
error = %e.as_report(),
"producing message (key {:?}) to topic {} failed",
rec.key.map(|k| k.to_bytes()),
rec.topic,
e
);
record = rec;
match e {
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::MetaClient;
use thiserror::Error;
use thiserror_ext::AsReport;
pub use tracing;

use self::catalog::{SinkFormatDesc, SinkType};
Expand Down Expand Up @@ -547,7 +548,7 @@ impl From<RpcError> for SinkError {

impl From<ClickHouseError> for SinkError {
fn from(value: ClickHouseError) -> Self {
SinkError::ClickHouse(format!("{}", value))
SinkError::ClickHouse(value.to_report_string())
}
}

Expand All @@ -559,6 +560,6 @@ impl From<DeltaTableError> for SinkError {

impl From<RedisError> for SinkError {
fn from(value: RedisError) -> Self {
SinkError::Redis(format!("{}", value))
SinkError::Redis(value.to_report_string())
}
}
8 changes: 5 additions & 3 deletions src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use core::fmt::Debug;
use std::collections::HashMap;

use anyhow::anyhow;
use anyhow::{anyhow, Context as _};
use async_nats::jetstream::context::Context;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -159,13 +159,15 @@ impl NatsSinkWriter {
self.context
.publish(self.config.common.subject.clone(), item.into())
.await
.map_err(|e| SinkError::Nats(anyhow!("nats sink error: {:?}", e)))?;
.context("nats sink error")
.map_err(SinkError::Nats)?;
}
Ok::<_, SinkError>(())
},
)
.await
.map_err(|e| SinkError::Nats(anyhow!("nats sink error: {:?}", e)))
.context("nats sink error")
.map_err(SinkError::Nats)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl RedisConfig {
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
let config =
serde_json::from_value::<RedisConfig>(serde_json::to_value(properties).unwrap())
.map_err(|e| SinkError::Config(anyhow!("{:?}", e)))?;
.map_err(|e| SinkError::Config(anyhow!(e)))?;
Ok(config)
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use risingwave_rpc_client::{
DEFAULT_BUFFER_SIZE,
};
use rw_futures_util::drop_either_future;
use thiserror_ext::AsReport;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender};
use tokio::task::spawn_blocking;
Expand Down Expand Up @@ -766,11 +767,13 @@ impl EmbeddedConnectorClient {

let jvm = self.jvm;
std::thread::spawn(move || {
let mut env = match jvm.attach_current_thread() {
let mut env = match jvm
.attach_current_thread()
.context("failed to attach current thread")
{
Ok(env) => env,
Err(e) => {
let _ = response_tx
.blocking_send(Err(anyhow!("failed to attach current thread: {:?}", e)));
let _ = response_tx.blocking_send(Err(e));
return;
}
};
Expand All @@ -789,7 +792,7 @@ impl EmbeddedConnectorClient {
tracing::info!("end of jni call {}::{}", class_name, method_name);
}
Err(e) => {
tracing::error!("jni call error: {:?}", e);
tracing::error!(error = %e.as_report(), "jni call error");
}
};
});
Expand Down
15 changes: 10 additions & 5 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use serde::Deserialize;
use serde_derive::Serialize;
use serde_json::Value;
use serde_with::serde_as;
use thiserror_ext::AsReport;
use with_options::WithOptions;

use super::doris_starrocks_connector::{
Expand Down Expand Up @@ -322,13 +323,17 @@ impl StarrocksSinkWriter {
.first()
.ok_or_else(|| SinkError::Starrocks("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| SinkError::Starrocks(format!("starrocks sink error {}", e)))?;
.map_err(|e| {
SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report()))
})?;

let scale = decimal_all
.last()
.ok_or_else(|| SinkError::Starrocks("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| SinkError::Starrocks(format!("starrocks sink error {}", e)))?;
.map_err(|e| {
SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report()))
})?;
decimal_map.insert(name.to_string(), (length, scale));
}
}
Expand Down Expand Up @@ -394,7 +399,7 @@ impl StarrocksSinkWriter {
Value::String("0".to_string()),
);
let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
SinkError::Starrocks(format!("Json derialize error {:?}", e))
SinkError::Starrocks(format!("Json derialize error: {}", e.as_report()))
})?;
self.client
.as_mut()
Expand All @@ -411,7 +416,7 @@ impl StarrocksSinkWriter {
Value::String("1".to_string()),
);
let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
SinkError::Starrocks(format!("Json derialize error {:?}", e))
SinkError::Starrocks(format!("Json derialize error: {}", e.as_report()))
})?;
self.client
.as_mut()
Expand All @@ -429,7 +434,7 @@ impl StarrocksSinkWriter {
Value::String("0".to_string()),
);
let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
SinkError::Starrocks(format!("Json derialize error {:?}", e))
SinkError::Starrocks(format!("Json derialize error: {}", e.as_report()))
})?;
self.client
.as_mut()
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ pub trait UnknownFields {

impl<P: DeserializeOwned + UnknownFields> TryFromHashmap for P {
fn try_from_hashmap(props: HashMap<String, String>, deny_unknown_fields: bool) -> Result<Self> {
let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?;
let res = serde_json::from_value::<P>(json_value).map_err(|e| anyhow!(e.to_string()))?;
let json_value = serde_json::to_value(props)?;
let res = serde_json::from_value::<P>(json_value)?;

if !deny_unknown_fields || res.unknown_fields().is_empty() {
Ok(res)
Expand Down Expand Up @@ -310,8 +310,8 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct>
};
return Ok(SourceStruct::new(format, encode));
}
let source_format = info.get_format().map_err(|e| anyhow!("{e:?}"))?;
let source_encode = info.get_row_encode().map_err(|e| anyhow!("{e:?}"))?;
let source_format = info.get_format()?;
let source_encode = info.get_row_encode()?;
let (format, encode) = match (source_format, source_encode) {
(PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
(PbFormatType::Plain, PbEncodeType::Protobuf) => {
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::marker::PhantomData;
use std::ops::Deref;
use std::str::FromStr;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use itertools::Itertools;
use prost::Message;
Expand Down Expand Up @@ -111,7 +111,7 @@ where
)
})
.await
.map_err(|e| anyhow!("failed to validate source: {:?}", e))??;
.context("failed to validate source")??;

tracing::debug!("validate cdc source properties success");
Ok(Self {
Expand Down
18 changes: 8 additions & 10 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod postgres;

use std::collections::HashMap;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use futures::stream::BoxStream;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
Expand Down Expand Up @@ -191,19 +191,18 @@ pub struct DebeziumSourceOffset {

impl MySqlOffset {
pub fn parse_debezium_offset(offset: &str) -> ConnectorResult<Self> {
let dbz_offset: DebeziumOffset = serde_json::from_str(offset).map_err(|e| {
ConnectorError::Internal(anyhow!("invalid upstream offset: {}, error: {}", offset, e))
})?;
let dbz_offset: DebeziumOffset = serde_json::from_str(offset)
.with_context(|| format!("invalid upstream offset: {}", offset))?;

Ok(Self {
filename: dbz_offset
.source_offset
.file
.ok_or_else(|| anyhow!("binlog file not found in offset"))?,
.context("binlog file not found in offset")?,
position: dbz_offset
.source_offset
.pos
.ok_or_else(|| anyhow!("binlog position not found in offset"))?,
.context("binlog position not found in offset")?,
})
}
}
Expand Down Expand Up @@ -268,7 +267,8 @@ impl ExternalTableReader for MySqlExternalTableReader {
let row = rs
.iter_mut()
.exactly_one()
.map_err(|e| ConnectorError::Internal(anyhow!("read binlog error: {}", e)))?;
.ok()
.context("expect exactly one row when reading binlog offset")?;

Ok(CdcOffset::MySql(MySqlOffset {
filename: row.take("File").unwrap(),
Expand Down Expand Up @@ -296,9 +296,7 @@ impl MySqlExternalTableReader {
let config = serde_json::from_value::<ExternalTableConfig>(
serde_json::to_value(with_properties).unwrap(),
)
.map_err(|e| {
ConnectorError::Config(anyhow!("fail to extract mysql connector properties: {}", e))
})?;
.context("failed to extract mysql connector properties")?;

let database_url = format!(
"mysql://{}:{}@{}:{}/{}",
Expand Down
Loading

0 comments on commit 5965695

Please sign in to comment.