Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): decouple starrocks commit from risingwave commit #16816

Merged
merged 20 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};

pub struct IcebergLogSinkerOf<W> {
pub struct DecoupleCheckpointLogSinkerOf<W> {
ly9chee marked this conversation as resolved.
Show resolved Hide resolved
writer: W,
sink_metrics: SinkMetrics,
commit_checkpoint_interval: NonZeroU64,
}

impl<W> IcebergLogSinkerOf<W> {
impl<W> DecoupleCheckpointLogSinkerOf<W> {
/// Create a log sinker with a commit checkpoint interval. The sinker should be used with a
/// decouple log reader `KvLogStoreReader`.
pub fn new(
writer: W,
sink_metrics: SinkMetrics,
commit_checkpoint_interval: NonZeroU64,
) -> Self {
IcebergLogSinkerOf {
DecoupleCheckpointLogSinkerOf {
writer,
sink_metrics,
commit_checkpoint_interval,
Expand All @@ -44,7 +44,7 @@ impl<W> IcebergLogSinkerOf<W> {
}

#[async_trait]
impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for IcebergLogSinkerOf<W> {
impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSinkerOf<W> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> {
let mut sink_writer = self.writer;
let sink_metrics = self.sink_metrics;
Expand Down
53 changes: 47 additions & 6 deletions src/connector/src/sink/deltalake.rs
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::num::NonZeroU64;
use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -30,6 +31,7 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
Expand All @@ -39,13 +41,15 @@ use serde_derive::{Deserialize, Serialize};
use serde_with::serde_as;
use with_options::WithOptions;

use super::catalog::desc::SinkDesc;
use super::coordinate::CoordinatedSinkWriter;
use super::writer::{LogSinkerOf, SinkWriter};
use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf;
use super::writer::SinkWriter;
use super::{
Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use crate::sink::writer::SinkWriterExt;
use crate::deserialize_optional_u64_from_string;

pub const DELTALAKE_SINK: &str = "deltalake";
pub const DEFAULT_REGION: &str = "us-east-1";
Expand All @@ -65,6 +69,9 @@ pub struct DeltaLakeCommon {
pub s3_endpoint: Option<String>,
#[serde(rename = "gcs.service.account")]
pub gcs_service_account: Option<String>,
// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
ly9chee marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
pub commit_checkpoint_interval: Option<u64>,
}
impl DeltaLakeCommon {
pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
Expand Down Expand Up @@ -269,18 +276,42 @@ fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -

impl Sink for DeltaLakeSink {
type Coordinator = DeltaLakeSinkCommitter;
type LogSinker = LogSinkerOf<CoordinatedSinkWriter<DeltaLakeSinkWriter>>;
type LogSinker = DecoupleCheckpointLogSinkerOf<CoordinatedSinkWriter<DeltaLakeSinkWriter>>;

const SINK_NAME: &'static str = DELTALAKE_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let config_decouple = if let Some(interval) =
desc.properties.get("commit_checkpoint_interval")
&& interval.parse::<u64>().unwrap_or(0) > 1
{
true
} else {
false
};

match user_specified {
SinkDecouple::Default => Ok(config_decouple),
SinkDecouple::Disable => {
if config_decouple {
return Err(SinkError::Config(anyhow!(
"config conflict: DeltaLake config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled"
ly9chee marked this conversation as resolved.
Show resolved Hide resolved
)));
}
Ok(false)
}
SinkDecouple::Enable => Ok(true),
}
}

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let inner = DeltaLakeSinkWriter::new(
self.config.clone(),
self.param.schema().clone(),
self.param.downstream_pk.clone(),
)
.await?;
Ok(CoordinatedSinkWriter::new(
let writer = CoordinatedSinkWriter::new(
writer_param
.meta_client
.expect("should have meta client")
Expand All @@ -294,8 +325,18 @@ impl Sink for DeltaLakeSink {
})?,
inner,
)
.await?
.into_log_sinker(writer_param.sink_metrics))
.await?;

let commit_checkpoint_interval =
NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect(
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
);

Ok(DecoupleCheckpointLogSinkerOf::new(
writer,
writer_param.sink_metrics,
commit_checkpoint_interval,
))
}

async fn validate(&self) -> Result<()> {
Expand Down
Loading
Loading