Skip to content

Commit

Permalink
feat(sink): set parallelism of iceberg sink to 1 (risingwavelabs#8476)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Mar 13, 2023
1 parent a6c8c86 commit 32100f3
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use risingwave_connector::sink::{
SINK_FORMAT_APPEND_ONLY, SINK_FORMAT_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use tracing::info;

use super::derive::{derive_columns, derive_pk};
use super::utils::IndicesDisplay;
Expand Down Expand Up @@ -72,10 +73,22 @@ impl StreamSink {
let required_dist = match input.distribution() {
Distribution::Single => RequiredDist::single(),
_ => {
assert_matches!(user_distributed_by, RequiredDist::Any);
RequiredDist::shard_by_key(input.schema().len(), input.logical_pk())
match properties.get("connector") {
Some(s) if s == "iceberg" => {
// iceberg with multiple parallelism will fail easily with concurrent commit
// on metadata
// TODO: reset iceberg sink to have multiple parallelism
info!("setting iceberg sink parallelism to singleton");
RequiredDist::single()
}
_ => {
assert_matches!(user_distributed_by, RequiredDist::Any);
RequiredDist::shard_by_key(input.schema().len(), input.logical_pk())
}
}
}
};

let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
let columns = derive_columns(input.schema(), out_names, &user_cols)?;

Expand Down

0 comments on commit 32100f3

Please sign in to comment.