Skip to content

Commit

Permalink
fix some
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Feb 5, 2024
1 parent b38e8b6 commit 278f5d9
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 62 deletions.
1 change: 0 additions & 1 deletion src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#![feature(error_generic_member_access)]
#![feature(register_tool)]
#![register_tool(rw)]
#![allow(rw::format_error)] // TODO(error-handling): need further refactoring

use std::time::Duration;

Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use risingwave_pb::catalog::{
SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo,
};
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use thiserror_ext::AsReport;

use self::avro::AvroAccessBuilder;
use self::bytes_parser::BytesAccessBuilder;
Expand Down Expand Up @@ -412,7 +413,7 @@ impl SourceStreamChunkRowWriter<'_> {
LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(
%error,
error = %error.as_report(),
split_id = self.row_meta.as_ref().map(|m| m.split_id),
offset = self.row_meta.as_ref().map(|m| m.offset),
column = desc.name,
Expand Down
21 changes: 11 additions & 10 deletions src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::types::{
DataType, Date, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
use rust_decimal::Decimal as RustDecimal;
use thiserror_ext::AsReport;

static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);

Expand Down Expand Up @@ -102,12 +103,12 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne
ScalarImpl::from(Timestamptz::from_micros(v.timestamp_micros()))
}),
Err(err) => {
if let Ok(suppressed) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column `{}` fail: {} ({} suppressed)",
name,
err,
suppressed
suppressed_count,
column_name = name,
error = %err.as_report(),
"parse column failed",
);
}
None
Expand All @@ -121,12 +122,12 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())),
Err(err) => {
if let Ok(suppressed) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column `{}` fail: {} ({} suppressed)",
name,
err,
suppressed
suppressed_count,
column_name = name,
error = %err.as_report(),
"parse column failed",
);
}
None
Expand Down
17 changes: 9 additions & 8 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::types::{
Timestamptz,
};
use rust_decimal::Decimal as RustDecimal;
use thiserror_ext::AsReport;

static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);

Expand Down Expand Up @@ -159,10 +160,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
name,
err,
sc
suppressed_count = sc,
column_name = name,
error = %err.as_report(),
"parse column failed",
);
}
None
Expand Down Expand Up @@ -256,10 +257,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
name,
err,
sc
suppressed_count = sc,
column_name = name,
error = %err.as_report(),
"parse column failed",
);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use simd_json::prelude::{
TypedValue, ValueAsContainer, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar,
};
use simd_json::{BorrowedValue, ValueType};
use thiserror_ext::AsReport;

use super::{Access, AccessError, AccessResult};
use crate::parser::common::json_object_get_case_insensitive;
Expand Down Expand Up @@ -468,7 +469,7 @@ impl JsonParseOptions {
// TODO: is it possible to unify the logging with the one in `do_action`?
static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(%error, suppressed_count, "undefined nested field, padding with `NULL`");
tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`");
}
&BorrowedValue::Static(simd_json::StaticNode::Null)
});
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use serde::ser::{SerializeSeq, SerializeStruct};
use serde::Serialize;
use serde_derive::Deserialize;
use serde_with::serde_as;
use thiserror_ext::AsReport;
use with_options::WithOptions;

use super::{DummySinkCommitCoordinator, SinkWriterParam};
Expand Down Expand Up @@ -436,7 +437,7 @@ impl ClickHouseSinkWriter {
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?
.map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
} else {
0_u8
};
Expand All @@ -455,7 +456,7 @@ impl ClickHouseSinkWriter {
.first()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?;
.map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;

if length > 38 {
return Err(SinkError::ClickHouse(
Expand All @@ -467,7 +468,7 @@ impl ClickHouseSinkWriter {
.last()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?;
.map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
(length, scale)
} else {
(0_u8, 0_u8)
Expand Down
22 changes: 11 additions & 11 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType};
use deltalake::protocol::{DeltaOperation, SaveMode};
Expand All @@ -26,6 +26,7 @@ use deltalake::table::builder::s3_storage_options::{
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::DeltaTable;
use risingwave_common::array::{to_deltalake_record_batch_with_schema, StreamChunk};
use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;
Expand Down Expand Up @@ -369,7 +370,8 @@ impl DeltaLakeSinkWriter {

async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
let a = to_deltalake_record_batch_with_schema(self.dl_schema.clone(), &chunk)
.map_err(|err| SinkError::DeltaLake(anyhow!("convert record batch error: {}", err)))?;
.context("convert record batch error")
.map_err(SinkError::DeltaLake)?;
self.writer.write(a).await?;
Ok(())
}
Expand All @@ -381,7 +383,8 @@ fn convert_schema(schema: &StructType) -> Result<deltalake::arrow::datatypes::Sc
let dl_field = deltalake::arrow::datatypes::Field::new(
field.name(),
deltalake::arrow::datatypes::DataType::try_from(field.data_type())
.map_err(|err| SinkError::DeltaLake(anyhow!("convert schema error: {}", err)))?,
.context("convert schema error")
.map_err(SinkError::DeltaLake)?,
field.is_nullable(),
);
builder.push(dl_field);
Expand Down Expand Up @@ -484,9 +487,8 @@ impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
type Error = SinkError;

fn try_from(value: &'a DeltaLakeWriteResult) -> std::prelude::v1::Result<Self, Self::Error> {
let metadata = serde_json::to_vec(&value.adds).map_err(|e| -> SinkError {
anyhow!("Can't serialized deltalake sink metadata: {}", e).into()
})?;
let metadata =
serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
Ok(SinkMetadata {
metadata: Some(Serialized(SerializedMetadata { metadata })),
})
Expand All @@ -496,13 +498,11 @@ impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
impl DeltaLakeWriteResult {
fn try_from(value: &SinkMetadata) -> Result<Self> {
if let Some(Serialized(v)) = &value.metadata {
let adds =
serde_json::from_slice::<Vec<Add>>(&v.metadata).map_err(|e| -> SinkError {
anyhow!("Can't deserialize deltalake sink metadata: {}", e).into()
})?;
let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
.context("Can't deserialize deltalake sink metadata")?;
Ok(DeltaLakeWriteResult { adds })
} else {
Err(anyhow!("Can't create deltalake sink write result from empty data!").into())
bail!("Can't create deltalake sink write result from empty data!")
}
}
}
Expand Down
27 changes: 14 additions & 13 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use base64::engine::general_purpose;
use base64::Engine;
Expand All @@ -31,6 +31,7 @@ use serde::Deserialize;
use serde_derive::Serialize;
use serde_json::Value;
use serde_with::serde_as;
use thiserror_ext::AsReport;
use with_options::WithOptions;

use super::doris_starrocks_connector::{
Expand Down Expand Up @@ -326,8 +327,9 @@ impl DorisSinkWriter {
DORIS_DELETE_SIGN.to_string(),
Value::String("0".to_string()),
);
let row_json_string = serde_json::to_string(&row_json_value)
.map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?;
let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
})?;
self.client
.as_mut()
.ok_or_else(|| {
Expand All @@ -342,8 +344,9 @@ impl DorisSinkWriter {
DORIS_DELETE_SIGN.to_string(),
Value::String("1".to_string()),
);
let row_json_string = serde_json::to_string(&row_json_value)
.map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?;
let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
})?;
self.client
.as_mut()
.ok_or_else(|| {
Expand All @@ -359,8 +362,9 @@ impl DorisSinkWriter {
DORIS_DELETE_SIGN.to_string(),
Value::String("0".to_string()),
);
let row_json_string = serde_json::to_string(&row_json_value)
.map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?;
let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
})?;
self.client
.as_mut()
.ok_or_else(|| {
Expand Down Expand Up @@ -471,12 +475,9 @@ impl DorisSchemaClient {
} else {
raw_bytes
};
let schema: DorisSchema = serde_json::from_str(&json_data).map_err(|err| {
SinkError::DorisStarrocksConnect(anyhow::anyhow!(
"Can't get schema from json {:?}",
err
))
})?;
let schema: DorisSchema = serde_json::from_str(&json_data)
.context("Can't get schema from json")
.map_err(SinkError::DorisStarrocksConnect)?;
Ok(schema)
}
}
Expand Down
9 changes: 3 additions & 6 deletions src/connector/src/sink/doris_starrocks_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use core::mem;
use core::time::Duration;
use std::collections::HashMap;

use anyhow::Context;
use base64::engine::general_purpose;
use base64::Engine;
use bytes::{BufMut, Bytes, BytesMut};
Expand Down Expand Up @@ -196,12 +197,8 @@ impl InserterInnerBuilder {
))
})?
.to_str()
.map_err(|err| {
SinkError::DorisStarrocksConnect(anyhow::anyhow!(
"Can't get doris BE url in header {:?}",
err
))
})?
.context("Can't get doris BE url in header")
.map_err(SinkError::DorisStarrocksConnect)?
} else {
return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
"Can't get doris BE url",
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/encoder/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, StructType};
use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
use thiserror_ext::AsReport;

use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo};

Expand Down Expand Up @@ -134,7 +135,7 @@ impl SerTo<Vec<u8>> for AvroEncoded {
)));
};
let raw = apache_avro::to_avro_datum(&self.schema, self.value)
.map_err(|e| crate::sink::SinkError::Encode(e.to_string()))?;
.map_err(|e| crate::sink::SinkError::Encode(e.to_report_string()))?;
let mut buf = Vec::with_capacity(1 + 4 + raw.len());
buf.put_u8(0);
buf.put_i32(schema_id);
Expand Down
10 changes: 6 additions & 4 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
use base64::engine::general_purpose;
use base64::Engine as _;
use chrono::{Datelike, NaiveDateTime, Timelike};
Expand All @@ -26,6 +27,7 @@ use risingwave_common::row::Row;
use risingwave_common::types::{DataType, DatumRef, Decimal, JsonbVal, ScalarRefImpl, ToText};
use risingwave_common::util::iter_util::ZipEqDebug;
use serde_json::{json, Map, Value};
use thiserror_ext::AsReport;

use super::{
CustomJsonType, DateHandlingMode, KafkaConnectParams, KafkaConnectParamsRef, Result,
Expand Down Expand Up @@ -134,7 +136,7 @@ impl RowEncoder for JsonEncoder {
self.time_handling_mode,
&self.custom_json_type,
)
.map_err(|e| SinkError::Encode(e.to_string()))?;
.map_err(|e| SinkError::Encode(e.to_report_string()))?;
mappings.insert(key, value);
}

Expand Down Expand Up @@ -311,9 +313,9 @@ fn datum_to_json_object(
)?;
map.insert(sub_field.name.clone(), value);
}
Value::String(serde_json::to_string(&map).map_err(|err| {
ArrayError::internal(format!("Json to string err{:?}", err))
})?)
Value::String(
serde_json::to_string(&map).context("failed to serialize into JSON")?,
)
}
CustomJsonType::Es | CustomJsonType::None => {
let mut map = Map::with_capacity(st.len());
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef,
};
Expand Down Expand Up @@ -384,14 +384,14 @@ impl IcebergConfig {
let catalog = self
.create_catalog()
.await
.map_err(|e| anyhow!("Unable to load iceberg catalog: {e}"))?;
.context("Unable to load iceberg catalog")?;

let table_id = TableIdentifier::new(
vec![self.database_name.as_str()]
.into_iter()
.chain(self.table_name.split('.')),
)
.map_err(|e| anyhow!("Unable to parse table name: {e}"))?;
.context("Unable to parse table name")?;

catalog
.load_table(&table_id)
Expand Down

0 comments on commit 278f5d9

Please sign in to comment.