Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): enable user-defined primary key for upsert sink #8610

Merged
merged 12 commits into from
Mar 17, 2023
16 changes: 8 additions & 8 deletions dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e_test/batch/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
explain create index i on t(v);

statement ok
explain create sink sink_t from t with ( connector = 'kafka', format = 'append_only' )
explain create sink sink_t from t with ( connector = 'kafka', type = 'append-only' )

statement ok
drop table t;
4 changes: 2 additions & 2 deletions e2e_test/ddl/table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ statement ok
explain select v2 from ddl_t;

statement ok
explain create sink sink_t from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );
explain create sink sink_t from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' );

statement ok
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' );

# Create a mview with duplicated name.
statement error
Expand Down
38 changes: 7 additions & 31 deletions e2e_test/sink/append_only_sink.slt
Original file line number Diff line number Diff line change
@@ -1,35 +1,20 @@
statement ok
create table t1 (v1 int, v2 int);

statement error No primary key for the upsert sink
create sink s1 from t1 with (connector = 'console');

statement ok
create sink s1 as select v1, v2, _row_id from t1 with (connector = 'console');

statement ok
create table t2 (v1 int, v2 int primary key);

statement ok
create sink s2 from t2 with (connector = 'console');

statement error No primary key for the upsert sink
create sink s3 as select avg(v1) from t2 with (connector = 'console');
create table t (v1 int, v2 int);

statement ok
create sink s3 as select avg(v1) from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');
create sink s1 from t with (connector = 'console');

statement ok
create sink s4 as select avg(v1), v2 from t2 group by v2 with (connector = 'console');
create sink s2 as select avg(v1), v2 from t group by v2 with (connector = 'console');

statement error The sink cannot be append-only
create sink s5 from t2 with (connector = 'console', format = 'append_only');
create sink s3 from t with (connector = 'console', type = 'append-only');

statement ok
create sink s5 from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');
create sink s3 from t with (connector = 'console', type = 'append-only', force_append_only = 'true');

statement error Cannot force the sink to be append-only
create sink s6 from t2 with (connector = 'console', format = 'upsert', force_append_only = 'true');
create sink s4 from t with (connector = 'console', type = 'upsert', force_append_only = 'true');

statement ok
drop sink s1
Expand All @@ -41,13 +26,4 @@ statement ok
drop sink s3

statement ok
drop sink s4

statement ok
drop sink s5

statement ok
drop table t1

statement ok
drop table t2
drop table t
2 changes: 1 addition & 1 deletion e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ from
s5 with (
properties.bootstrap.server = '127.0.0.1:29092',
topic = 'sink_target',
format = 'append_only',
type = 'append-only',
connector = 'kafka'
)

Expand Down
4 changes: 2 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ message Sink {
repeated common.ColumnOrder pk = 6;
repeated uint32 dependent_relations = 7;
repeated int32 distribution_key = 8;
// pk_indices of the corresponding materialize operator's output.
repeated int32 stream_key = 9;
// User-defined primary key indices for upsert sink.
repeated int32 downstream_pk = 9;
SinkType sink_type = 10;
uint32 owner = 11;
map<string, string> properties = 12;
Expand Down
2 changes: 1 addition & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ message SinkDesc {
string definition = 3;
repeated plan_common.ColumnDesc columns = 4;
repeated common.ColumnOrder pk = 5;
repeated uint32 stream_key = 6;
repeated uint32 downstream_pk = 6;
repeated uint32 distribution_key = 7;
map<string, string> properties = 8;
catalog.SinkType sink_type = 9;
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ pub struct SinkDesc {
/// order (ASC/DESC).
pub pk: Vec<ColumnOrder>,

/// Primary key indices of the corresponding sink operator's output.
pub stream_key: Vec<usize>,
/// User-defined primary key indices for upsert sink.
pub downstream_pk: Vec<usize>,

/// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
/// distribution keys will be `columns[1]` and `columns[2]`.
Expand Down Expand Up @@ -72,7 +72,7 @@ impl SinkDesc {
definition: self.definition,
columns: self.columns,
pk: self.pk,
stream_key: self.stream_key,
downstream_pk: self.downstream_pk,
distribution_key: self.distribution_key,
owner,
dependent_relations,
Expand All @@ -92,7 +92,7 @@ impl SinkDesc {
.map(|column| Into::<ProstColumnDesc>::into(&column.column_desc))
.collect_vec(),
pk: self.pk.iter().map(|k| k.to_protobuf()).collect_vec(),
stream_key: self.stream_key.iter().map(|idx| *idx as _).collect_vec(),
downstream_pk: self.downstream_pk.iter().map(|idx| *idx as _).collect_vec(),
distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
properties: self.properties.clone().into_iter().collect(),
sink_type: self.sink_type.to_proto() as i32,
Expand Down
16 changes: 10 additions & 6 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ pub struct SinkCatalog {
/// order (ASC/DESC).
pub pk: Vec<ColumnOrder>,

/// Primary key indices of the corresponding sink operator's output.
pub stream_key: Vec<usize>,
/// User-defined primary key indices for upsert sink.
pub downstream_pk: Vec<usize>,

/// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
/// distribution keys will be `columns[1]` and `columns[2]`.
Expand Down Expand Up @@ -151,7 +151,11 @@ impl SinkCatalog {
definition: self.definition.clone(),
columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
stream_key: self.stream_key.iter().map(|idx| *idx as i32).collect_vec(),
downstream_pk: self
.downstream_pk
.iter()
.map(|idx| *idx as i32)
.collect_vec(),
dependent_relations: self
.dependent_relations
.iter()
Expand All @@ -177,8 +181,8 @@ impl SinkCatalog {
Schema { fields }
}

pub fn pk_indices(&self) -> Vec<usize> {
self.pk.iter().map(|k| k.column_index).collect_vec()
pub fn downstream_pk_indices(&self) -> Vec<usize> {
self.downstream_pk.clone()
}
}

Expand All @@ -197,7 +201,7 @@ impl From<ProstSink> for SinkCatalog {
.map(ColumnCatalog::from)
.collect_vec(),
pk: pb.pk.iter().map(ColumnOrder::from_protobuf).collect_vec(),
stream_key: pb.stream_key.iter().map(|k| *k as _).collect_vec(),
downstream_pk: pb.downstream_pk.iter().map(|k| *k as _).collect_vec(),
distribution_key: pb.distribution_key.iter().map(|k| *k as _).collect_vec(),
properties: pb.properties.clone(),
owner: pb.owner.into(),
Expand Down
38 changes: 29 additions & 9 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use serde_derive::Deserialize;
use serde_json::{json, Map, Value};
use tracing::warn;

use super::{Sink, SinkError, SINK_FORMAT_APPEND_ONLY, SINK_FORMAT_DEBEZIUM, SINK_FORMAT_UPSERT};
use super::{
Sink, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::common::KafkaCommon;
use crate::sink::Result;
use crate::{deserialize_bool_from_string, deserialize_duration_from_string};
Expand Down Expand Up @@ -61,7 +63,7 @@ pub struct KafkaConfig {
#[serde(flatten)]
pub common: KafkaCommon,

pub format: String, // accept "append_only", "debezium", or "upsert"
pub r#type: String, // accept "append-only", "debezium", or "upsert"

pub identifier: String,

Expand Down Expand Up @@ -94,12 +96,16 @@ impl KafkaConfig {
let config = serde_json::from_value::<KafkaConfig>(serde_json::to_value(values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;

if config.format != SINK_FORMAT_APPEND_ONLY
&& config.format != SINK_FORMAT_DEBEZIUM
&& config.format != SINK_FORMAT_UPSERT
if config.r#type != SINK_TYPE_APPEND_ONLY
&& config.r#type != SINK_TYPE_DEBEZIUM
&& config.r#type != SINK_TYPE_UPSERT
{
return Err(SinkError::Config(anyhow!(
"format must be append_only, debezium, or upsert"
"`{}` must be {}, {}, or {}",
SINK_TYPE_OPTION,
SINK_TYPE_APPEND_ONLY,
SINK_TYPE_DEBEZIUM,
SINK_TYPE_UPSERT
)));
}
Ok(config)
Expand Down Expand Up @@ -134,6 +140,20 @@ impl<const APPEND_ONLY: bool> KafkaSink<APPEND_ONLY> {
})
}

pub async fn validate(config: KafkaConfig, pk_indices: Vec<usize>) -> Result<()> {
// For upsert Kafka sink, the primary key must be defined.
if !APPEND_ONLY && pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
"primary key not defined for upsert kafka sink (please define in `primary_key` field)"
)));
}

// Try Kafka connection.
KafkaTransactionConductor::new(config).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse the conductor instance? it has some overhead to recreate a connection to kafka broker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May refine this in the future.


Ok(())
}

// any error should report to upper level and requires revert to previous epoch.
pub async fn do_with_retry<'a, F, FutKR, T>(&'a self, f: F) -> KafkaResult<T>
where
Expand Down Expand Up @@ -305,7 +325,7 @@ impl<const APPEND_ONLY: bool> Sink for KafkaSink<APPEND_ONLY> {
self.append_only(chunk).await
} else {
// Debezium
if self.config.format == SINK_FORMAT_DEBEZIUM {
if self.config.r#type == SINK_TYPE_DEBEZIUM {
self.debezium_update(
chunk,
SystemTime::now()
Expand Down Expand Up @@ -598,7 +618,7 @@ mod test {
let properties: HashMap<String, String> = hashmap! {
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"format".to_string() => "append_only".to_string(),
"type".to_string() => "append-only".to_string(),
"use_transaction".to_string() => "False".to_string(),
"security_protocol".to_string() => "SASL".to_string(),
"sasl_mechanism".to_string() => "SASL".to_string(),
Expand All @@ -618,7 +638,7 @@ mod test {
let properties = hashmap! {
"kafka.brokers".to_string() => "localhost:29092".to_string(),
"identifier".to_string() => "test_sink_1".to_string(),
"sink.type".to_string() => "append_only".to_string(),
"type".to_string() => "append-only".to_string(),
"kafka.topic".to_string() => "test_topic".to_string(),
};
let schema = Schema::new(vec![
Expand Down
Loading