Skip to content

Commit 7f08c38

Browse files
authored
feat(sink): introduce exactly once iceberg sink (#19771)
1 parent ed4555e commit 7f08c38

File tree

24 files changed

+923
-198
lines changed

24 files changed

+923
-198
lines changed

Cargo.lock

Lines changed: 6 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,12 @@ otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev =
151151
prost = { version = "0.13" }
152152
prost-build = { version = "0.13" }
153153
# branch dev_rebase_main_20250325
154-
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c9da916ab29b13adcb137ba40bad9e2dc10309c4", features = [
154+
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "14c3387c388f64e99dd2a811cdd5f554fe7680c3", features = [
155155
"storage-s3",
156156
"storage-gcs",
157157
] }
158-
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c9da916ab29b13adcb137ba40bad9e2dc10309c4" }
159-
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "c9da916ab29b13adcb137ba40bad9e2dc10309c4" }
158+
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "14c3387c388f64e99dd2a811cdd5f554fe7680c3" }
159+
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "14c3387c388f64e99dd2a811cdd5f554fe7680c3" }
160160
opendal = "0.49"
161161
arrow-udf-js = "0.6.1"
162162
arrow-udf-wasm = { version = "0.5.1", features = ["build"] }

src/bench/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ plotters = { version = "0.3.5", default-features = false, features = [
2424
risingwave_common = { workspace = true }
2525
risingwave_connector = { workspace = true }
2626
risingwave_stream = { workspace = true }
27+
sea-orm = { workspace = true }
2728
serde = { version = "1", features = ["derive"] }
2829
serde_yaml = "0.9"
2930
thiserror-ext = { workspace = true }

src/bench/sink_bench/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ use risingwave_connector::source::{
6464
};
6565
use risingwave_stream::executor::test_utils::prelude::ColumnDesc;
6666
use risingwave_stream::executor::{Barrier, Message, MessageStreamItem, StreamExecutorError};
67+
use sea_orm::DatabaseConnection;
6768
use serde::{Deserialize, Deserializer};
6869
use thiserror_ext::AsReport;
6970
use tokio::sync::oneshot::Sender;
@@ -382,7 +383,7 @@ where
382383
<S as risingwave_connector::sink::Sink>::Coordinator: std::marker::Send,
383384
<S as risingwave_connector::sink::Sink>::Coordinator: 'static,
384385
{
385-
if let Ok(coordinator) = sink.new_coordinator().await {
386+
if let Ok(coordinator) = sink.new_coordinator(DatabaseConnection::Disconnected).await {
386387
sink_writer_param.meta_client = Some(SinkMetaClient::MockMetaClient(MockMetaClient::new(
387388
Box::new(coordinator),
388389
)));

src/connector/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ risingwave_common_estimate_size = { workspace = true }
111111
risingwave_common_rate_limit = { workspace = true }
112112
risingwave_connector_codec = { workspace = true }
113113
risingwave_jni_core = { workspace = true }
114+
risingwave_meta_model = { workspace = true }
114115
risingwave_pb = { workspace = true }
115116
risingwave_rpc_client = { workspace = true }
116117
rumqttc = { version = "0.24.0", features = ["url"] }
@@ -120,6 +121,7 @@ rustls-pemfile = "2"
120121
rustls-pki-types = "1"
121122
rw_futures_util = { workspace = true }
122123
scopeguard = "1"
124+
sea-orm = { workspace = true }
123125
sea-schema = { version = "0.16", default-features = false, features = [
124126
"discovery",
125127
"sqlx-postgres",

src/connector/src/sink/boxed.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use risingwave_common::array::StreamChunk;
2020
use risingwave_common::bitmap::Bitmap;
2121
use risingwave_pb::connector_service::SinkMetadata;
2222

23+
use super::SinkCommittedEpochSubscriber;
2324
use crate::sink::{SinkCommitCoordinator, SinkWriter};
2425

2526
pub type BoxWriter<CM> = Box<dyn SinkWriter<CommitMetadata = CM> + Send + 'static>;
@@ -52,8 +53,11 @@ impl<CM: 'static + Send> SinkWriter for BoxWriter<CM> {
5253

5354
#[async_trait]
5455
impl SinkCommitCoordinator for BoxCoordinator {
55-
async fn init(&mut self) -> crate::sink::Result<Option<u64>> {
56-
self.deref_mut().init().await
56+
async fn init(
57+
&mut self,
58+
subscriber: SinkCommittedEpochSubscriber,
59+
) -> crate::sink::Result<Option<u64>> {
60+
self.deref_mut().init(subscriber).await
5761
}
5862

5963
async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> crate::sink::Result<()> {

src/connector/src/sink/deltalake.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use risingwave_common::util::iter_util::ZipEqDebug;
3737
use risingwave_pb::connector_service::SinkMetadata;
3838
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
3939
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
40+
use sea_orm::DatabaseConnection;
4041
use serde_derive::{Deserialize, Serialize};
4142
use serde_with::{DisplayFromStr, serde_as};
4243
use with_options::WithOptions;
@@ -48,7 +49,7 @@ use super::decouple_checkpoint_log_sink::{
4849
use super::writer::SinkWriter;
4950
use super::{
5051
Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkCommitCoordinator,
51-
SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam,
52+
SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam,
5253
};
5354
use crate::connector_common::AwsAuthProps;
5455

@@ -384,7 +385,7 @@ impl Sink for DeltaLakeSink {
384385
true
385386
}
386387

387-
async fn new_coordinator(&self) -> Result<Self::Coordinator> {
388+
async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
388389
Ok(DeltaLakeSinkCommitter {
389390
table: self.config.common.create_deltalake_client().await?,
390391
})
@@ -496,7 +497,7 @@ pub struct DeltaLakeSinkCommitter {
496497

497498
#[async_trait::async_trait]
498499
impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
499-
async fn init(&mut self) -> crate::sink::Result<Option<u64>> {
500+
async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
500501
tracing::info!("DeltaLake commit coordinator inited.");
501502
Ok(None)
502503
}

0 commit comments

Comments
 (0)