Skip to content

Commit

Permalink
feat(sink): set default_sink_decouple = true for all sink (#18182)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Aug 26, 2024
1 parent 220fded commit 7d20fa8
Show file tree
Hide file tree
Showing 19 changed files with 121 additions and 181 deletions.
3 changes: 3 additions & 0 deletions e2e_test/sink/cassandra_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float,
statement ok
CREATE TABLE t7 ("TEST_V1" int primary key, "TEST_V2" int, "TEST_V3" int);

statement ok
set sink_decouple = false;

statement ok
CREATE SINK s6
FROM
Expand Down
1 change: 1 addition & 0 deletions e2e_test/sink/clickhouse_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4,
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='demo_test',
commit_checkpoint_interval = 1,
);

statement ok
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/elasticsearch/elasticsearch_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t7 (
v1 int primary key,
Expand Down
21 changes: 0 additions & 21 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,6 @@ where
})
}

pub(crate) fn deserialize_optional_u64_from_string<'de, D>(
deserializer: D,
) -> Result<Option<u64>, D::Error>
where
D: de::Deserializer<'de>,
{
let s: String = de::Deserialize::deserialize(deserializer)?;
if s.is_empty() {
Ok(None)
} else {
s.parse()
.map_err(|_| {
de::Error::invalid_value(
de::Unexpected::Str(&s),
&"integer greater than or equal to 0",
)
})
.map(Some)
}
}

pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
deserializer: D,
) -> std::result::Result<Option<Vec<String>>, D::Error>
Expand Down
43 changes: 23 additions & 20 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@ use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
use serde::ser::{SerializeSeq, SerializeStruct};
use serde::Serialize;
use serde_derive::Deserialize;
use serde_with::serde_as;
use serde_with::{serde_as, DisplayFromStr};
use thiserror_ext::AsReport;
use tonic::async_trait;
use tracing::warn;
use with_options::WithOptions;

use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf;
use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
};
use super::writer::SinkWriter;
use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::deserialize_optional_u64_from_string;
use crate::error::ConnectorResult;
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::{
Expand All @@ -52,6 +54,7 @@ const QUERY_COLUMN: &str =
"select distinct ?fields from system.columns where database = ? and table = ? order by ?";
pub const CLICKHOUSE_SINK: &str = "clickhouse";

#[serde_as]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct ClickHouseCommon {
#[serde(rename = "clickhouse.url")]
Expand All @@ -66,9 +69,10 @@ pub struct ClickHouseCommon {
pub table: String,
#[serde(rename = "clickhouse.delete.column")]
pub delete_column: Option<String>,
/// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
pub commit_checkpoint_interval: Option<u64>,
/// Commit every n(>0) checkpoints, default is 10.
#[serde(default = "default_commit_checkpoint_interval")]
#[serde_as(as = "DisplayFromStr")]
pub commit_checkpoint_interval: u64,
}

#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -494,26 +498,25 @@ impl Sink for ClickHouseSink {
const SINK_NAME: &'static str = CLICKHOUSE_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let config_decouple = if let Some(interval) =
desc.properties.get("commit_checkpoint_interval")
&& interval.parse::<u64>().unwrap_or(0) > 1
{
true
} else {
false
};
let commit_checkpoint_interval =
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
interval
.parse::<u64>()
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
} else {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
};

match user_specified {
SinkDecouple::Default => Ok(config_decouple),
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if config_decouple {
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: Clickhouse config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
Ok(false)
}
SinkDecouple::Enable => Ok(true),
}
}

Expand Down Expand Up @@ -552,9 +555,9 @@ impl Sink for ClickHouseSink {
self.check_pk_match(&clickhouse_column)?;
}

if self.config.common.commit_checkpoint_interval == Some(0) {
if self.config.common.commit_checkpoint_interval == 0 {
return Err(SinkError::Config(anyhow!(
"commit_checkpoint_interval must be greater than 0"
"`commit_checkpoint_interval` must be greater than 0"
)));
}
Ok(())
Expand All @@ -569,7 +572,7 @@ impl Sink for ClickHouseSink {
)
.await?;
let commit_checkpoint_interval =
NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect(
NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
);

Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ use async_trait::async_trait;
use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL: u64 = 10;

pub fn default_commit_checkpoint_interval() -> u64 {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
}

/// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`).
/// The concurrent/frequent commit capability of these sinks is poor, so by leveraging the decoupled log reader,
Expand Down
44 changes: 24 additions & 20 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,26 @@ use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
use risingwave_pb::connector_service::SinkMetadata;
use serde_derive::{Deserialize, Serialize};
use serde_with::serde_as;
use serde_with::{serde_as, DisplayFromStr};
use with_options::WithOptions;

use super::catalog::desc::SinkDesc;
use super::coordinate::CoordinatedSinkWriter;
use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf;
use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
};
use super::writer::SinkWriter;
use super::{
Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use crate::deserialize_optional_u64_from_string;

pub const DELTALAKE_SINK: &str = "deltalake";
pub const DEFAULT_REGION: &str = "us-east-1";
pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";

#[serde_as]
#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
pub struct DeltaLakeCommon {
#[serde(rename = "s3.access.key")]
Expand All @@ -69,10 +72,12 @@ pub struct DeltaLakeCommon {
pub s3_endpoint: Option<String>,
#[serde(rename = "gcs.service.account")]
pub gcs_service_account: Option<String>,
/// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
pub commit_checkpoint_interval: Option<u64>,
/// Commit every n(>0) checkpoints, default is 10.
#[serde(default = "default_commit_checkpoint_interval")]
#[serde_as(as = "DisplayFromStr")]
pub commit_checkpoint_interval: u64,
}

impl DeltaLakeCommon {
pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
let table = match Self::get_table_url(&self.location)? {
Expand Down Expand Up @@ -281,26 +286,25 @@ impl Sink for DeltaLakeSink {
const SINK_NAME: &'static str = DELTALAKE_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let config_decouple = if let Some(interval) =
desc.properties.get("commit_checkpoint_interval")
&& interval.parse::<u64>().unwrap_or(0) > 1
{
true
} else {
false
};
let commit_checkpoint_interval =
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
interval
.parse::<u64>()
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
} else {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
};

match user_specified {
SinkDecouple::Default => Ok(config_decouple),
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if config_decouple {
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
Ok(false)
}
SinkDecouple::Enable => Ok(true),
}
}

Expand Down Expand Up @@ -328,7 +332,7 @@ impl Sink for DeltaLakeSink {
.await?;

let commit_checkpoint_interval =
NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect(
NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
);

Expand Down Expand Up @@ -380,9 +384,9 @@ impl Sink for DeltaLakeSink {
)));
}
}
if self.config.common.commit_checkpoint_interval == Some(0) {
if self.config.common.commit_checkpoint_interval == 0 {
return Err(SinkError::Config(anyhow!(
"commit_checkpoint_interval must be greater than 0"
"`commit_checkpoint_interval` must be greater than 0"
)));
}
Ok(())
Expand Down
9 changes: 0 additions & 9 deletions src/connector/src/sink/google_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_pubsub::publisher::{Awaiter, Publisher};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tonic::Status;
use with_options::WithOptions;

use super::catalog::desc::SinkDesc;
use super::catalog::SinkFormatDesc;
use super::formatter::SinkFormatterImpl;
use super::log_store::DeliveryFutureManagerAddFuture;
Expand Down Expand Up @@ -114,13 +112,6 @@ impl Sink for GooglePubSubSink {

const SINK_NAME: &'static str = PUBSUB_SINK;

fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
}
}

async fn validate(&self) -> Result<()> {
if !self.is_append_only {
return Err(SinkError::GooglePubSub(anyhow!(
Expand Down
Loading

0 comments on commit 7d20fa8

Please sign in to comment.