Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): decouple starrocks commit from risingwave commit #16816

Merged
merged 20 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};

/// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`).
/// The concurrent/frequent commit capability of these sinks is poor, so by leveraging the decoupled log reader,
/// we delay the checkpoint barrier to make commits less frequent.
pub struct DecoupleCheckpointLogSinkerOf<W> {
ly9chee marked this conversation as resolved.
Show resolved Hide resolved
writer: W,
sink_metrics: SinkMetrics,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/deltalake.rs
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub struct DeltaLakeCommon {
pub s3_endpoint: Option<String>,
#[serde(rename = "gcs.service.account")]
pub gcs_service_account: Option<String>,
// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
/// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
pub commit_checkpoint_interval: Option<u64>,
}
Expand Down Expand Up @@ -295,7 +295,7 @@ impl Sink for DeltaLakeSink {
SinkDecouple::Disable => {
if config_decouple {
return Err(SinkError::Config(anyhow!(
"config conflict: DeltaLake config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled"
"config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
Ok(false)
Expand Down
Loading
Loading