Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sink): fix starrocks doris and clickhouse decimal #15664

Merged
merged 6 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions ci/scripts/e2e-clickhouse-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ sleep 1
echo "--- create clickhouse table"
curl https://clickhouse.com/ | sh
sleep 2
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2), v5 decimal64(3))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"

echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/clickhouse_sink.slt'
Expand All @@ -41,13 +41,13 @@ sleep 5

# check sink destination using shell
if cat ./query_result.csv | sort | awk -F "," '{
if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"" && $4 == "\"A\"") c1++;
if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"" && $4 == "\"B\"") c2++;
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"B\"") c3++;
if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"" && $4 == "\"A\"") c4++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"A\"") c5++;
if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"" && $4 == "\"B\"") c6++;
if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"" && $4 == "\"A\"") c7++; }
if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"" && $4 == "\"A\"" && $5 == 1.1) c1++;
if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"" && $4 == "\"B\"" && $5 == 0) c2++;
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"B\"" && $5 == 2.2) c3++;
if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"" && $4 == "\"A\"" && $5 == 0) c4++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"A\"" && $5 == 3.3) c5++;
if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"" && $4 == "\"B\"" && $5 == 4.4) c6++;
if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"" && $4 == "\"A\"" && $5 == 0) c7++; }
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1 && c6 == 1 && c7 == 1); }'; then
echo "Clickhouse sink check passed"
else
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/e2e-starrocks-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ echo "--- create starrocks table"
apt-get update -y && apt-get install -y mysql-client
sleep 2
mysql -uroot -P 9030 -h starrocks-fe-server -e "CREATE database demo;use demo;
CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 date,v8 datetime,v9 boolean,v10 json) ENGINE=OLAP
CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 date,v8 datetime,v9 boolean,v10 json,v11 decimal(10,5)) ENGINE=OLAP
PRIMARY KEY(\`v1\`)
DISTRIBUTED BY HASH(\`v1\`) properties(\"replication_num\" = \"1\");
CREATE USER 'users'@'%' IDENTIFIED BY '123456';
Expand All @@ -46,7 +46,7 @@ mysql -uroot -P 9030 -h starrocks-fe-server -e "select * from demo.demo_bhv_tabl


if cat ./query_result.csv | sed '1d; s/\t/,/g' | awk -F "," '{
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0 && $10 = "{"v101": 100}"); }'; then
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0 && $10 = "{"v101": 100}" && $11 == 1.12346); }'; then
echo "Starrocks sink check passed"
else
cat ./query_result.csv
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/sink/clickhouse_sink.slt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint);
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint, v5 decimal);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4 from mv6 WITH (
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4, mv6.v5 as v5 from mv6 WITH (
connector = 'clickhouse',
type = 'append-only',
force_append_only='true',
Expand All @@ -17,7 +17,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4
);

statement ok
INSERT INTO t6 VALUES (1, 50, '1-50', 1), (2, 2, '2-2', 2), (3, 2, '3-2', 1), (5, 2, '5-2', 2), (8, 2, '8-2', 1), (13, 2, '13-2', 2), (21, 2, '21-2', 1);
INSERT INTO t6 VALUES (1, 50, '1-50', 1, 1.1), (2, 2, '2-2', 2, 2.2), (3, 2, '3-2', 1, 3.3), (5, 2, '5-2', 2, 4.4), (8, 2, '8-2', 1, 'inf'), (13, 2, '13-2', 2, '-inf'), (21, 2, '21-2', 1, 'nan');

statement ok
FLUSH;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/starrocks_sink.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb);
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb, v11 decimal);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
Expand All @@ -21,7 +21,7 @@ FROM
);

statement ok
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false, '{"v101":100}');
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false, '{"v101":100}',1.12345678910);

statement ok
FLUSH;
Expand Down
29 changes: 15 additions & 14 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use serde::Serialize;
use serde_derive::Deserialize;
use serde_with::serde_as;
use thiserror_ext::AsReport;
use tracing::warn;
use with_options::WithOptions;

use super::{DummySinkCommitCoordinator, SinkWriterParam};
Expand Down Expand Up @@ -747,27 +748,27 @@ impl ClickHouseFieldWithNull {
ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_string()),
ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
ScalarRefImpl::Decimal(d) => {
if let Decimal::Normalized(d) = d {
let d = if let Decimal::Normalized(d) = d {
let scale =
clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;

let scale = if scale < 0 {
if scale < 0 {
d.mantissa() / 10_i128.pow(scale.unsigned_abs())
} else {
d.mantissa() * 10_i128.pow(scale as u32)
};

if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(scale as i32))
} else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(scale as i64))
} else {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(scale))
}
} else if clickhouse_schema_feature.can_null {
warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
return Ok(vec![ClickHouseFieldWithNull::None]);
} else {
return Err(SinkError::ClickHouse(
"clickhouse can not support Decimal NAN,-INF and INF".to_string(),
));
warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
0_i128
};
if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
} else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
} else {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
}
}
ScalarRefImpl::Interval(_) => {
Expand Down
6 changes: 2 additions & 4 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,9 @@ pub struct DorisField {
aggregation_type: String,
}
impl DorisField {
pub fn get_decimal_pre_scale(&self) -> Option<(u8, u8)> {
pub fn get_decimal_pre_scale(&self) -> Option<u8> {
if self.r#type.contains("DECIMAL") {
let a = self.precision.clone().unwrap().parse::<u8>().unwrap();
let b = self.scale.clone().unwrap().parse::<u8>().unwrap();
Some((a, b))
Some(self.scale.clone().unwrap().parse::<u8>().unwrap())
} else {
None
}
Expand Down
45 changes: 17 additions & 28 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use itertools::Itertools;
use risingwave_common::array::{ArrayError, ArrayResult};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, DatumRef, Decimal, JsonbVal, ScalarRefImpl, ToText};
use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText};
use risingwave_common::util::iter_util::ZipEqDebug;
use serde_json::{json, Map, Value};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -83,7 +83,7 @@ impl JsonEncoder {
pub fn new_with_doris(
schema: Schema,
col_indices: Option<Vec<usize>>,
map: HashMap<String, (u8, u8)>,
map: HashMap<String, u8>,
) -> Self {
Self {
schema,
Expand All @@ -97,19 +97,15 @@ impl JsonEncoder {
}
}

pub fn new_with_starrocks(
schema: Schema,
col_indices: Option<Vec<usize>>,
map: HashMap<String, (u8, u8)>,
) -> Self {
pub fn new_with_starrocks(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
Self {
schema,
col_indices,
time_handling_mode: TimeHandlingMode::Milli,
date_handling_mode: DateHandlingMode::String,
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::StarRocks(map),
custom_json_type: CustomJsonType::StarRocks,
kafka_connect: None,
}
}
Expand Down Expand Up @@ -242,24 +238,17 @@ fn datum_to_json_object(
(DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
json!(v)
}
// Doris/Starrocks will convert out-of-bounds decimal and -INF, INF, NAN to NULL
(DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match custom_json_type {
CustomJsonType::Doris(map) | CustomJsonType::StarRocks(map) => {
if !matches!(v, Decimal::Normalized(_)) {
return Err(ArrayError::internal(
"doris/starrocks can't support decimal Inf, -Inf, Nan".to_string(),
));
}
let (p, s) = map.get(&field.name).unwrap();
CustomJsonType::Doris(map) => {
let s = map.get(&field.name).unwrap();
v.rescale(*s as u32);
let v_string = v.to_text();
let len = v_string.clone().replace(['.', '-'], "").len();
if len > *p as usize {
return Err(ArrayError::internal(
format!("rw Decimal's precision is large than doris/starrocks max decimal len is {:?}, doris max is {:?}",v_string.len(),p)));
}
json!(v_string)
json!(v.to_text())
}
CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => {
CustomJsonType::Es
| CustomJsonType::None
| CustomJsonType::BigQuery
| CustomJsonType::StarRocks => {
json!(v.to_text())
}
},
Expand Down Expand Up @@ -310,7 +299,7 @@ fn datum_to_json_object(
json!(v.as_iso_8601())
}
(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type {
CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(),
CustomJsonType::Es | CustomJsonType::StarRocks => JsonbVal::from(jsonb_ref).take(),
CustomJsonType::Doris(_) | CustomJsonType::None | CustomJsonType::BigQuery => {
json!(jsonb_ref.to_string())
}
Expand Down Expand Up @@ -357,7 +346,7 @@ fn datum_to_json_object(
serde_json::to_string(&map).context("failed to serialize into JSON")?,
)
}
CustomJsonType::StarRocks(_) => {
CustomJsonType::StarRocks => {
return Err(ArrayError::internal(
"starrocks can't support struct".to_string(),
));
Expand Down Expand Up @@ -470,8 +459,8 @@ fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
mod tests {

use risingwave_common::types::{
DataType, Date, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
Timestamp,
DataType, Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue,
Time, Timestamp,
};

use super::*;
Expand Down Expand Up @@ -639,7 +628,7 @@ mod tests {
assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));

let mut map = HashMap::default();
map.insert("aaa".to_string(), (10_u8, 5_u8));
map.insert("aaa".to_string(), 5_u8);
let decimal = datum_to_json_object(
&Field {
data_type: DataType::Decimal,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ pub enum CustomJsonType {
// Doris's json need date is string.
// The internal order of the struct should follow the insertion order.
// The decimal needs verification and calibration.
Doris(HashMap<String, (u8, u8)>),
Doris(HashMap<String, u8>),
// Es's json need jsonb is struct
Es,
// starrocks' need jsonb is struct
StarRocks(HashMap<String, (u8, u8)>),
StarRocks,
// bigquery need null array -> []
BigQuery,
None,
Expand Down
50 changes: 3 additions & 47 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;
use anyhow::anyhow;
use async_trait::async_trait;
use bytes::Bytes;
use itertools::Itertools;
use mysql_async::prelude::Queryable;
use mysql_async::Opts;
use risingwave_common::array::{Op, StreamChunk};
Expand Down Expand Up @@ -223,8 +222,7 @@ impl Sink for StarrocksSink {
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await?
)?
.into_log_sinker(writer_param.sink_metrics))
}

Expand Down Expand Up @@ -293,54 +291,12 @@ impl TryFrom<SinkParam> for StarrocksSink {
}

impl StarrocksSinkWriter {
pub async fn new(
pub fn new(
config: StarrocksConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
) -> Result<Self> {
let mut decimal_map = HashMap::default();
let starrocks_columns = StarrocksSchemaClient::new(
config.common.host.clone(),
config.common.mysql_port.clone(),
config.common.table.clone(),
config.common.database.clone(),
config.common.user.clone(),
config.common.password.clone(),
)
.await?
.get_columns_from_starrocks()
.await?;

for (name, column_type) in &starrocks_columns {
if column_type.contains("decimal") {
let decimal_all = column_type
.split("decimal(")
.last()
.ok_or_else(|| SinkError::Starrocks("must have last".to_string()))?
.split(')')
.next()
.ok_or_else(|| SinkError::Starrocks("must have next".to_string()))?
.split(',')
.collect_vec();
let length = decimal_all
.first()
.ok_or_else(|| SinkError::Starrocks("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| {
SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report()))
})?;

let scale = decimal_all
.last()
.ok_or_else(|| SinkError::Starrocks("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| {
SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report()))
})?;
decimal_map.insert(name.to_string(), (length, scale));
}
}
let mut fields_name = schema.names_str();
if !is_append_only {
fields_name.push(STARROCKS_DELETE_SIGN);
Expand All @@ -367,7 +323,7 @@ impl StarrocksSinkWriter {
inserter_innet_builder: starrocks_insert_builder,
is_append_only,
client: None,
row_encoder: JsonEncoder::new_with_starrocks(schema, None, decimal_map),
row_encoder: JsonEncoder::new_with_starrocks(schema, None),
})
}

Expand Down
Loading