From 7f0bc7b47930c395bcb5f30610b8038b2e718664 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 17 Mar 2023 01:58:06 +0000 Subject: [PATCH 01/11] enable user to specify sink pk --- e2e_test/batch/explain.slt | 2 +- e2e_test/ddl/table.slt | 4 +- e2e_test/sink/append_only_sink.slt | 38 ++------- e2e_test/source/basic/kafka.slt | 2 +- proto/catalog.proto | 4 +- proto/stream_plan.proto | 2 +- src/connector/src/sink/catalog/desc.rs | 8 +- src/connector/src/sink/catalog/mod.rs | 16 ++-- src/connector/src/sink/kafka.rs | 34 ++++++-- src/connector/src/sink/mod.rs | 19 ++--- src/connector/src/sink/remote.rs | 2 +- .../src/optimizer/plan_node/stream_sink.rs | 82 ++++++++++++++++--- src/tests/simulation/tests/it/sink.rs | 4 +- 13 files changed, 136 insertions(+), 81 deletions(-) diff --git a/e2e_test/batch/explain.slt b/e2e_test/batch/explain.slt index 00e735d360aa0..cff6af31f2470 100644 --- a/e2e_test/batch/explain.slt +++ b/e2e_test/batch/explain.slt @@ -5,7 +5,7 @@ statement ok explain create index i on t(v); statement ok -explain create sink sink_t from t with ( connector = 'kafka', format = 'append_only' ) +explain create sink sink_t from t with ( connector = 'kafka', type = 'append-only' ) statement ok drop table t; diff --git a/e2e_test/ddl/table.slt b/e2e_test/ddl/table.slt index dacebfe493b36..9d1cd172e1b5c 100644 --- a/e2e_test/ddl/table.slt +++ b/e2e_test/ddl/table.slt @@ -39,10 +39,10 @@ statement ok explain select v2 from ddl_t; statement ok -explain create sink sink_t from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' ); +explain create sink sink_t from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' ); statement ok -explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' ); +explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' ); # Create a mview with duplicated name. statement error diff --git a/e2e_test/sink/append_only_sink.slt b/e2e_test/sink/append_only_sink.slt index cf4e185d1b28f..57f194f033f4f 100644 --- a/e2e_test/sink/append_only_sink.slt +++ b/e2e_test/sink/append_only_sink.slt @@ -1,35 +1,20 @@ statement ok -create table t1 (v1 int, v2 int); - -statement error No primary key for the upsert sink -create sink s1 from t1 with (connector = 'console'); - -statement ok -create sink s1 as select v1, v2, _row_id from t1 with (connector = 'console'); - -statement ok -create table t2 (v1 int, v2 int primary key); - -statement ok -create sink s2 from t2 with (connector = 'console'); - -statement error No primary key for the upsert sink -create sink s3 as select avg(v1) from t2 with (connector = 'console'); +create table t (v1 int, v2 int); statement ok -create sink s3 as select avg(v1) from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true'); +create sink s1 from t with (connector = 'console'); statement ok -create sink s4 as select avg(v1), v2 from t2 group by v2 with (connector = 'console'); +create sink s2 as select avg(v1), v2 from t group by v2 with (connector = 'console'); statement error The sink cannot be append-only -create sink s5 from t2 with (connector = 'console', format = 'append_only'); +create sink s3 from t with (connector = 'console', type = 'append-only'); statement ok -create sink s5 from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true'); +create sink s3 from t with (connector = 'console', type = 'append-only', force_append_only = 'true'); statement error Cannot force the sink to be append-only -create sink s6 from t2 with (connector = 'console', format = 'upsert', force_append_only = 'true'); +create sink s4 from t with (connector = 'console', type = 'upsert', force_append_only = 'true'); statement ok drop sink s1 @@ -41,13 +26,4 @@ statement ok drop sink s3 statement ok -drop sink s4 - -statement ok -drop sink s5 - -statement ok -drop table t1 - -statement ok -drop table t2 +drop table t diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 0e6f7e6bca00b..96f9f8e3bec05 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -79,7 +79,7 @@ from s5 with ( properties.bootstrap.server = '127.0.0.1:29092', topic = 'sink_target', - format = 'append_only', + type = 'append-only', connector = 'kafka' ) diff --git a/proto/catalog.proto b/proto/catalog.proto index 0501abfbb892d..31f6667db2f15 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -76,8 +76,8 @@ message Sink { repeated common.ColumnOrder pk = 6; repeated uint32 dependent_relations = 7; repeated int32 distribution_key = 8; - // pk_indices of the corresponding materialize operator's output. - repeated int32 stream_key = 9; + // User-defined primary key indices for upsert sink. + repeated int32 downstream_pk = 9; SinkType sink_type = 10; uint32 owner = 11; map properties = 12; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index f865ab992e014..7bc96129cb778 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -143,7 +143,7 @@ message SinkDesc { string definition = 3; repeated plan_common.ColumnDesc columns = 4; repeated common.ColumnOrder pk = 5; - repeated uint32 stream_key = 6; + repeated uint32 downstream_pk = 6; repeated uint32 distribution_key = 7; map properties = 8; catalog.SinkType sink_type = 9; diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 7b975081c02a9..a762ad19b5271 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -40,8 +40,8 @@ pub struct SinkDesc { /// order (ASC/DESC). pub pk: Vec, - /// Primary key indices of the corresponding sink operator's output. - pub stream_key: Vec, + /// User-defined primary key indices for upsert sink. + pub downstream_pk: Vec, /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the /// distribution keys will be `columns[1]` and `columns[2]`. @@ -72,7 +72,7 @@ impl SinkDesc { definition: self.definition, columns: self.columns, pk: self.pk, - stream_key: self.stream_key, + downstream_pk: self.downstream_pk, distribution_key: self.distribution_key, owner, dependent_relations, @@ -92,7 +92,7 @@ impl SinkDesc { .map(|column| Into::::into(&column.column_desc)) .collect_vec(), pk: self.pk.iter().map(|k| k.to_protobuf()).collect_vec(), - stream_key: self.stream_key.iter().map(|idx| *idx as _).collect_vec(), + downstream_pk: self.downstream_pk.iter().map(|idx| *idx as _).collect_vec(), distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(), properties: self.properties.clone().into_iter().collect(), sink_type: self.sink_type.to_proto() as i32, diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index dbcca4ca5a50f..6bb6968e0ce31 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -119,8 +119,8 @@ pub struct SinkCatalog { /// order (ASC/DESC). pub pk: Vec, - /// Primary key indices of the corresponding sink operator's output. - pub stream_key: Vec, + /// User-defined primary key indices for upsert sink. + pub downstream_pk: Vec, /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the /// distribution keys will be `columns[1]` and `columns[2]`. @@ -151,7 +151,11 @@ impl SinkCatalog { definition: self.definition.clone(), columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(), pk: self.pk.iter().map(|o| o.to_protobuf()).collect(), - stream_key: self.stream_key.iter().map(|idx| *idx as i32).collect_vec(), + downstream_pk: self + .downstream_pk + .iter() + .map(|idx| *idx as i32) + .collect_vec(), dependent_relations: self .dependent_relations .iter() @@ -177,8 +181,8 @@ impl SinkCatalog { Schema { fields } } - pub fn pk_indices(&self) -> Vec { - self.pk.iter().map(|k| k.column_index).collect_vec() + pub fn downstream_pk_indices(&self) -> Vec { + self.downstream_pk.clone() } } @@ -197,7 +201,7 @@ impl From for SinkCatalog { .map(ColumnCatalog::from) .collect_vec(), pk: pb.pk.iter().map(ColumnOrder::from_protobuf).collect_vec(), - stream_key: pb.stream_key.iter().map(|k| *k as _).collect_vec(), + downstream_pk: pb.downstream_pk.iter().map(|k| *k as _).collect_vec(), distribution_key: pb.distribution_key.iter().map(|k| *k as _).collect_vec(), properties: pb.properties.clone(), owner: pb.owner.into(), diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 0a429e4ce7bb8..8e8662c48c5eb 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -33,7 +33,9 @@ use serde_derive::Deserialize; use serde_json::{json, Map, Value}; use tracing::warn; -use super::{Sink, SinkError, SINK_FORMAT_APPEND_ONLY, SINK_FORMAT_DEBEZIUM, SINK_FORMAT_UPSERT}; +use super::{ + Sink, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; use crate::common::KafkaCommon; use crate::sink::Result; use crate::{deserialize_bool_from_string, deserialize_duration_from_string}; @@ -61,7 +63,7 @@ pub struct KafkaConfig { #[serde(flatten)] pub common: KafkaCommon, - pub format: String, // accept "append_only", "debezium", or "upsert" + pub r#type: String, // accept "append-only", "debezium", or "upsert" pub identifier: String, @@ -94,12 +96,16 @@ impl KafkaConfig { let config = serde_json::from_value::(serde_json::to_value(values).unwrap()) .map_err(|e| SinkError::Config(anyhow!(e)))?; - if config.format != SINK_FORMAT_APPEND_ONLY - && config.format != SINK_FORMAT_DEBEZIUM - && config.format != SINK_FORMAT_UPSERT + if config.r#type != SINK_TYPE_APPEND_ONLY + && config.r#type != SINK_TYPE_DEBEZIUM + && config.r#type != SINK_TYPE_UPSERT { return Err(SinkError::Config(anyhow!( - "format must be append_only, debezium, or upsert" + "`{}` must be {}, {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_DEBEZIUM, + SINK_TYPE_UPSERT ))); } Ok(config) @@ -134,6 +140,20 @@ impl KafkaSink { }) } + pub async fn validate(config: KafkaConfig, pk_indices: Vec) -> Result<()> { + // For upsert Kafka sink, the primary key must be defined. + if !APPEND_ONLY && pk_indices.is_empty() { + return Err(SinkError::Config(anyhow!( + "primary key not defined for upsert kafka sink (please define in `primary_key` field)" + ))); + } + + // Try Kafka connection. + KafkaTransactionConductor::new(config).await?; + + Ok(()) + } + // any error should report to upper level and requires revert to previous epoch. pub async fn do_with_retry<'a, F, FutKR, T>(&'a self, f: F) -> KafkaResult where @@ -305,7 +325,7 @@ impl Sink for KafkaSink { self.append_only(chunk).await } else { // Debezium - if self.config.format == SINK_FORMAT_DEBEZIUM { + if self.config.r#type == SINK_TYPE_DEBEZIUM { self.debezium_update( chunk, SystemTime::now() diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 57fb456de880f..6ab5da3431d9e 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -38,10 +38,10 @@ use crate::sink::redis::{RedisConfig, RedisSink}; use crate::sink::remote::{RemoteConfig, RemoteSink}; use crate::ConnectorParams; -pub const SINK_FORMAT_OPTION: &str = "format"; -pub const SINK_FORMAT_APPEND_ONLY: &str = "append_only"; -pub const SINK_FORMAT_DEBEZIUM: &str = "debezium"; -pub const SINK_FORMAT_UPSERT: &str = "upsert"; +pub const SINK_TYPE_OPTION: &str = "type"; +pub const SINK_TYPE_APPEND_ONLY: &str = "append-only"; +pub const SINK_TYPE_DEBEZIUM: &str = "debezium"; +pub const SINK_TYPE_UPSERT: &str = "upsert"; pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only"; #[async_trait] @@ -82,10 +82,10 @@ pub const BLACKHOLE_SINK: &str = "blackhole"; impl SinkConfig { pub fn from_hashmap(properties: HashMap) -> Result { - const SINK_TYPE_KEY: &str = "connector"; + const CONNECTOR_TYPE_KEY: &str = "connector"; let sink_type = properties - .get(SINK_TYPE_KEY) - .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", SINK_TYPE_KEY)))?; + .get(CONNECTOR_TYPE_KEY) + .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?; match sink_type.to_lowercase().as_str() { KAFKA_SINK => Ok(SinkConfig::Kafka(Box::new(KafkaConfig::from_hashmap( properties, @@ -169,13 +169,12 @@ impl SinkImpl { match cfg { SinkConfig::Redis(cfg) => RedisSink::new(cfg, sink_catalog.schema()).map(|_| ()), SinkConfig::Kafka(cfg) => { - // We simply call `KafkaSink::new` here to validate a Kafka sink. if sink_catalog.sink_type.is_append_only() { - KafkaSink::::new(*cfg, sink_catalog.schema(), sink_catalog.pk_indices()) + KafkaSink::::validate(*cfg, sink_catalog.downstream_pk_indices()) .await .map(|_| ()) } else { - KafkaSink::::new(*cfg, sink_catalog.schema(), sink_catalog.pk_indices()) + KafkaSink::::validate(*cfg, sink_catalog.downstream_pk_indices()) .await .map(|_| ()) } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 198d7081a2d58..d96184c5a8fa8 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -190,7 +190,7 @@ impl RemoteSink { let table_schema = TableSchema { columns, pk_indices: sink_catalog - .pk_indices() + .downstream_pk_indices() .iter() .map(|i| *i as _) .collect_vec(), diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 36f20b785f85a..e7545d780ab4f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -23,7 +23,8 @@ use risingwave_common::error::{ErrorCode, Result}; use risingwave_connector::sink::catalog::desc::SinkDesc; use risingwave_connector::sink::catalog::{SinkId, SinkType}; use risingwave_connector::sink::{ - SINK_FORMAT_APPEND_ONLY, SINK_FORMAT_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, + SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + SINK_USER_FORCE_APPEND_ONLY_OPTION, }; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use tracing::info; @@ -112,17 +113,13 @@ impl StreamSink { definition: String, properties: WithOptions, ) -> Result { + const DOWNSTREAM_PK_KEY: &str = "primary_key"; + let distribution_key = input.distribution().dist_column_indices().to_vec(); let sink_type = Self::derive_sink_type(input.append_only(), &properties)?; - let (pk, stream_key) = derive_pk(input, user_order_by, &columns); - - if sink_type == SinkType::Upsert && pk.is_empty() { - return Err(ErrorCode::SinkError(Box::new(Error::new( - ErrorKind::InvalidInput, - "No primary key for the upsert sink. Please include the primary key explicitly in sink definition or make the sink append-only.", - ))) - .into()); - } + let (pk, _) = derive_pk(input, user_order_by, &columns); + + let downstream_pk = Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; Ok(SinkDesc { id: SinkId::placeholder(), @@ -130,7 +127,7 @@ impl StreamSink { definition, columns, pk, - stream_key, + downstream_pk, distribution_key, properties: properties.into_inner(), sink_type, @@ -138,9 +135,28 @@ impl StreamSink { } fn derive_sink_type(input_append_only: bool, properties: &WithOptions) -> Result { + if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) { + if sink_type != SINK_TYPE_APPEND_ONLY + && sink_type != SINK_TYPE_DEBEZIUM + && sink_type != SINK_TYPE_UPSERT + { + return Err(ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!( + "`{}` must be {}, {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_DEBEZIUM, + SINK_TYPE_UPSERT + ), + ))) + .into()); + } + } + let frontend_derived_append_only = input_append_only; let user_defined_append_only = - properties.value_eq_ignore_case(SINK_FORMAT_OPTION, SINK_FORMAT_APPEND_ONLY); + properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY); let user_force_append_only = properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"); @@ -162,12 +178,52 @@ impl StreamSink { (_, false, true) => { Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, - "Cannot force the sink to be append-only without \"format='append_only'\"in WITH options.", + "Cannot force the sink to be append-only without \"type='append-only'\"in WITH options.", ))) .into()) } } } + + /// Extract user-defined downstream pk columns from with options. Return the indices of the pk + /// columns. + fn parse_downstream_pk( + columns: &[ColumnCatalog], + downstream_pk_str: Option<&String>, + ) -> Result> { + match downstream_pk_str { + Some(downstream_pk_str) => { + // If the user defines the downstream primary key, we find out their indices. + let downstream_pk = downstream_pk_str.split(',').collect_vec(); + let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len()); + for key in downstream_pk { + let trimmed_key = key.trim(); + if trimmed_key.is_empty() { + continue; + } + match columns + .iter() + .position(|col| col.column_desc.name == trimmed_key) + { + Some(index) => downstream_pk_indices.push(index), + None => { + return Err(ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", trimmed_key), + ))) + .into()); + } + } + } + Ok(downstream_pk_indices) + } + None => { + // The user doesn't define the downstream primary key and we simply return an empty + // vector. + Ok(Vec::new()) + } + } + } } impl PlanTreeNodeUnary for StreamSink { diff --git a/src/tests/simulation/tests/it/sink.rs b/src/tests/simulation/tests/it/sink.rs index a92043c3c5a90..ac95d98cffade 100644 --- a/src/tests/simulation/tests/it/sink.rs +++ b/src/tests/simulation/tests/it/sink.rs @@ -28,9 +28,9 @@ use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; const ROOT_TABLE_CREATE: &str = "create table t (v1 int) append only;"; -const APPEND_ONLY_SINK_CREATE: &str = "create sink s1 from t with (connector='kafka', properties.bootstrap.server='192.168.11.1:29092', topic='t_sink_append_only', format='append_only');"; +const APPEND_ONLY_SINK_CREATE: &str = "create sink s1 from t with (connector='kafka', properties.bootstrap.server='192.168.11.1:29092', topic='t_sink_append_only', type='append-only');"; const MV_CREATE: &str = "create materialized view m as select count(*) from t;"; -const DEBEZIUM_SINK_CREATE: &str = "create sink s2 from m with (connector='kafka', properties.bootstrap.server='192.168.11.1:29092', topic='t_sink_debezium', format='debezium');"; +const DEBEZIUM_SINK_CREATE: &str = "create sink s2 from m with (connector='kafka', properties.bootstrap.server='192.168.11.1:29092', topic='t_sink_debezium', type='debezium');"; const APPEND_ONLY_TOPIC: &str = "t_sink_append_only"; const DEBEZIUM_TOPIC: &str = "t_sink_debezium"; From 683e51273669c9b705223fcb1b7cbd67302774b5 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 17 Mar 2023 01:58:27 +0000 Subject: [PATCH 02/11] dashboard --- dashboard/proto/gen/catalog.ts | 16 ++++++++-------- dashboard/proto/gen/stream_plan.ts | 14 +++++++------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/dashboard/proto/gen/catalog.ts b/dashboard/proto/gen/catalog.ts index bcd7c1f140abe..eb4ed2e147db8 100644 --- a/dashboard/proto/gen/catalog.ts +++ b/dashboard/proto/gen/catalog.ts @@ -168,8 +168,8 @@ export interface Sink { pk: ColumnOrder[]; dependentRelations: number[]; distributionKey: number[]; - /** pk_indices of the corresponding materialize operator's output. */ - streamKey: number[]; + /** User-defined primary key indices for upsert sink. */ + downstreamPk: number[]; sinkType: SinkType; owner: number; properties: { [key: string]: string }; @@ -621,7 +621,7 @@ function createBaseSink(): Sink { pk: [], dependentRelations: [], distributionKey: [], - streamKey: [], + downstreamPk: [], sinkType: SinkType.UNSPECIFIED, owner: 0, properties: {}, @@ -644,7 +644,7 @@ export const Sink = { distributionKey: Array.isArray(object?.distributionKey) ? object.distributionKey.map((e: any) => Number(e)) : [], - streamKey: Array.isArray(object?.streamKey) ? object.streamKey.map((e: any) => Number(e)) : [], + downstreamPk: Array.isArray(object?.downstreamPk) ? object.downstreamPk.map((e: any) => Number(e)) : [], sinkType: isSet(object.sinkType) ? sinkTypeFromJSON(object.sinkType) : SinkType.UNSPECIFIED, owner: isSet(object.owner) ? Number(object.owner) : 0, properties: isObject(object.properties) @@ -683,10 +683,10 @@ export const Sink = { } else { obj.distributionKey = []; } - if (message.streamKey) { - obj.streamKey = message.streamKey.map((e) => Math.round(e)); + if (message.downstreamPk) { + obj.downstreamPk = message.downstreamPk.map((e) => Math.round(e)); } else { - obj.streamKey = []; + obj.downstreamPk = []; } message.sinkType !== undefined && (obj.sinkType = sinkTypeToJSON(message.sinkType)); message.owner !== undefined && (obj.owner = Math.round(message.owner)); @@ -710,7 +710,7 @@ export const Sink = { message.pk = object.pk?.map((e) => ColumnOrder.fromPartial(e)) || []; message.dependentRelations = object.dependentRelations?.map((e) => e) || []; message.distributionKey = object.distributionKey?.map((e) => e) || []; - message.streamKey = object.streamKey?.map((e) => e) || []; + message.downstreamPk = object.downstreamPk?.map((e) => e) || []; message.sinkType = object.sinkType ?? SinkType.UNSPECIFIED; message.owner = object.owner ?? 0; message.properties = Object.entries(object.properties ?? {}).reduce<{ [key: string]: string }>( diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index e6b43c3d7d8ed..ccc7e62672836 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -409,7 +409,7 @@ export interface SinkDesc { definition: string; columns: ColumnDesc[]; pk: ColumnOrder[]; - streamKey: number[]; + downstreamPk: number[]; distributionKey: number[]; properties: { [key: string]: string }; sinkType: SinkType; @@ -1930,7 +1930,7 @@ function createBaseSinkDesc(): SinkDesc { definition: "", columns: [], pk: [], - streamKey: [], + downstreamPk: [], distributionKey: [], properties: {}, sinkType: SinkType.UNSPECIFIED, @@ -1947,7 +1947,7 @@ export const SinkDesc = { ? object.columns.map((e: any) => ColumnDesc.fromJSON(e)) : [], pk: Array.isArray(object?.pk) ? object.pk.map((e: any) => ColumnOrder.fromJSON(e)) : [], - streamKey: Array.isArray(object?.streamKey) ? object.streamKey.map((e: any) => Number(e)) : [], + downstreamPk: Array.isArray(object?.downstreamPk) ? object.downstreamPk.map((e: any) => Number(e)) : [], distributionKey: Array.isArray(object?.distributionKey) ? object.distributionKey.map((e: any) => Number(e)) : [], properties: isObject(object.properties) ? Object.entries(object.properties).reduce<{ [key: string]: string }>((acc, [key, value]) => { @@ -1974,10 +1974,10 @@ export const SinkDesc = { } else { obj.pk = []; } - if (message.streamKey) { - obj.streamKey = message.streamKey.map((e) => Math.round(e)); + if (message.downstreamPk) { + obj.downstreamPk = message.downstreamPk.map((e) => Math.round(e)); } else { - obj.streamKey = []; + obj.downstreamPk = []; } if (message.distributionKey) { obj.distributionKey = message.distributionKey.map((e) => Math.round(e)); @@ -2001,7 +2001,7 @@ export const SinkDesc = { message.definition = object.definition ?? ""; message.columns = object.columns?.map((e) => ColumnDesc.fromPartial(e)) || []; message.pk = object.pk?.map((e) => ColumnOrder.fromPartial(e)) || []; - message.streamKey = object.streamKey?.map((e) => e) || []; + message.downstreamPk = object.downstreamPk?.map((e) => e) || []; message.distributionKey = object.distributionKey?.map((e) => e) || []; message.properties = Object.entries(object.properties ?? {}).reduce<{ [key: string]: string }>( (acc, [key, value]) => { From fbb2d928de8b8fdb18e2f6a6816d8ae2138ef643 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 17 Mar 2023 03:00:56 +0000 Subject: [PATCH 03/11] fix ut --- src/connector/src/sink/kafka.rs | 4 ++-- src/frontend/src/handler/create_sink.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 8e8662c48c5eb..36e9d1460f98b 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -618,7 +618,7 @@ mod test { let properties: HashMap = hashmap! { "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), "topic".to_string() => "test".to_string(), - "format".to_string() => "append_only".to_string(), + "type".to_string() => "append-only".to_string(), "use_transaction".to_string() => "False".to_string(), "security_protocol".to_string() => "SASL".to_string(), "sasl_mechanism".to_string() => "SASL".to_string(), @@ -638,7 +638,7 @@ mod test { let properties = hashmap! { "kafka.brokers".to_string() => "localhost:29092".to_string(), "identifier".to_string() => "test_sink_1".to_string(), - "sink.type".to_string() => "append_only".to_string(), + "type".to_string() => "append-only".to_string(), "kafka.topic".to_string() => "test_topic".to_string(), }; let schema = Schema::new(vec![ diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index b7aad90ac95b7..f8928d290b3a9 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -177,7 +177,7 @@ pub mod tests { let sql = r#"CREATE SINK snk1 FROM mv1 WITH (connector = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table = '', mysql.database = '', mysql.user = '', - mysql.password = '', format = 'append_only', force_append_only = 'true');"#.to_string(); + mysql.password = '', type = 'append-only', force_append_only = 'true');"#.to_string(); frontend.run_sql(sql).await.unwrap(); let session = frontend.session_ref(); From c62114358f29240f12ed9ce6cbbdaaa0741be3e5 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 17 Mar 2023 05:41:34 +0000 Subject: [PATCH 04/11] rename pk to plan_pk --- dashboard/proto/gen/catalog.ts | 17 +++++++++-------- dashboard/proto/gen/stream_plan.ts | 14 +++++++------- proto/catalog.proto | 5 +++-- proto/stream_plan.proto | 2 +- src/connector/src/sink/catalog/desc.rs | 9 ++++----- src/connector/src/sink/catalog/mod.rs | 13 ++++++++----- .../src/optimizer/plan_node/stream_sink.rs | 4 ++-- src/stream/src/from_proto/sink.rs | 2 +- 8 files changed, 35 insertions(+), 31 deletions(-) diff --git a/dashboard/proto/gen/catalog.ts b/dashboard/proto/gen/catalog.ts index eb4ed2e147db8..c79e04f959387 100644 --- a/dashboard/proto/gen/catalog.ts +++ b/dashboard/proto/gen/catalog.ts @@ -165,10 +165,11 @@ export interface Sink { databaseId: number; name: string; columns: ColumnCatalog[]; - pk: ColumnOrder[]; + /** Primary key derived from the SQL by the frontend. */ + planPk: ColumnOrder[]; dependentRelations: number[]; distributionKey: number[]; - /** User-defined primary key indices for upsert sink. */ + /** User-defined primary key indices for the upsert sink. */ downstreamPk: number[]; sinkType: SinkType; owner: number; @@ -618,7 +619,7 @@ function createBaseSink(): Sink { databaseId: 0, name: "", columns: [], - pk: [], + planPk: [], dependentRelations: [], distributionKey: [], downstreamPk: [], @@ -637,7 +638,7 @@ export const Sink = { databaseId: isSet(object.databaseId) ? Number(object.databaseId) : 0, name: isSet(object.name) ? String(object.name) : "", columns: Array.isArray(object?.columns) ? object.columns.map((e: any) => ColumnCatalog.fromJSON(e)) : [], - pk: Array.isArray(object?.pk) ? object.pk.map((e: any) => ColumnOrder.fromJSON(e)) : [], + planPk: Array.isArray(object?.planPk) ? object.planPk.map((e: any) => ColumnOrder.fromJSON(e)) : [], dependentRelations: Array.isArray(object?.dependentRelations) ? object.dependentRelations.map((e: any) => Number(e)) : [], @@ -668,10 +669,10 @@ export const Sink = { } else { obj.columns = []; } - if (message.pk) { - obj.pk = message.pk.map((e) => e ? ColumnOrder.toJSON(e) : undefined); + if (message.planPk) { + obj.planPk = message.planPk.map((e) => e ? ColumnOrder.toJSON(e) : undefined); } else { - obj.pk = []; + obj.planPk = []; } if (message.dependentRelations) { obj.dependentRelations = message.dependentRelations.map((e) => Math.round(e)); @@ -707,7 +708,7 @@ export const Sink = { message.databaseId = object.databaseId ?? 0; message.name = object.name ?? ""; message.columns = object.columns?.map((e) => ColumnCatalog.fromPartial(e)) || []; - message.pk = object.pk?.map((e) => ColumnOrder.fromPartial(e)) || []; + message.planPk = object.planPk?.map((e) => ColumnOrder.fromPartial(e)) || []; message.dependentRelations = object.dependentRelations?.map((e) => e) || []; message.distributionKey = object.distributionKey?.map((e) => e) || []; message.downstreamPk = object.downstreamPk?.map((e) => e) || []; diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index ccc7e62672836..abf3c5ae90626 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -408,7 +408,7 @@ export interface SinkDesc { name: string; definition: string; columns: ColumnDesc[]; - pk: ColumnOrder[]; + planPk: ColumnOrder[]; downstreamPk: number[]; distributionKey: number[]; properties: { [key: string]: string }; @@ -1929,7 +1929,7 @@ function createBaseSinkDesc(): SinkDesc { name: "", definition: "", columns: [], - pk: [], + planPk: [], downstreamPk: [], distributionKey: [], properties: {}, @@ -1946,7 +1946,7 @@ export const SinkDesc = { columns: Array.isArray(object?.columns) ? object.columns.map((e: any) => ColumnDesc.fromJSON(e)) : [], - pk: Array.isArray(object?.pk) ? object.pk.map((e: any) => ColumnOrder.fromJSON(e)) : [], + planPk: Array.isArray(object?.planPk) ? object.planPk.map((e: any) => ColumnOrder.fromJSON(e)) : [], downstreamPk: Array.isArray(object?.downstreamPk) ? object.downstreamPk.map((e: any) => Number(e)) : [], distributionKey: Array.isArray(object?.distributionKey) ? object.distributionKey.map((e: any) => Number(e)) : [], properties: isObject(object.properties) @@ -1969,10 +1969,10 @@ export const SinkDesc = { } else { obj.columns = []; } - if (message.pk) { - obj.pk = message.pk.map((e) => e ? ColumnOrder.toJSON(e) : undefined); + if (message.planPk) { + obj.planPk = message.planPk.map((e) => e ? ColumnOrder.toJSON(e) : undefined); } else { - obj.pk = []; + obj.planPk = []; } if (message.downstreamPk) { obj.downstreamPk = message.downstreamPk.map((e) => Math.round(e)); @@ -2000,7 +2000,7 @@ export const SinkDesc = { message.name = object.name ?? ""; message.definition = object.definition ?? ""; message.columns = object.columns?.map((e) => ColumnDesc.fromPartial(e)) || []; - message.pk = object.pk?.map((e) => ColumnOrder.fromPartial(e)) || []; + message.planPk = object.planPk?.map((e) => ColumnOrder.fromPartial(e)) || []; message.downstreamPk = object.downstreamPk?.map((e) => e) || []; message.distributionKey = object.distributionKey?.map((e) => e) || []; message.properties = Object.entries(object.properties ?? {}).reduce<{ [key: string]: string }>( diff --git a/proto/catalog.proto b/proto/catalog.proto index 31f6667db2f15..6f8ee99e2478a 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -73,10 +73,11 @@ message Sink { uint32 database_id = 3; string name = 4; repeated plan_common.ColumnCatalog columns = 5; - repeated common.ColumnOrder pk = 6; + // Primary key derived from the SQL by the frontend. + repeated common.ColumnOrder plan_pk = 6; repeated uint32 dependent_relations = 7; repeated int32 distribution_key = 8; - // User-defined primary key indices for upsert sink. + // User-defined primary key indices for the upsert sink. repeated int32 downstream_pk = 9; SinkType sink_type = 10; uint32 owner = 11; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 7bc96129cb778..5d05d55630ed8 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -142,7 +142,7 @@ message SinkDesc { string name = 2; string definition = 3; repeated plan_common.ColumnDesc columns = 4; - repeated common.ColumnOrder pk = 5; + repeated common.ColumnOrder plan_pk = 5; repeated uint32 downstream_pk = 6; repeated uint32 distribution_key = 7; map properties = 8; diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index a762ad19b5271..a1b7898df6667 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -36,9 +36,8 @@ pub struct SinkDesc { /// All columns of the sink. Note that this is NOT sorted by columnId in the vector. pub columns: Vec, - /// Primiary keys of the sink (connector). Now the sink does not care about a field's - /// order (ASC/DESC). - pub pk: Vec, + /// Primiary keys of the sink. Derived by the frontend. + pub plan_pk: Vec, /// User-defined primary key indices for upsert sink. pub downstream_pk: Vec, @@ -71,7 +70,7 @@ impl SinkDesc { name: self.name, definition: self.definition, columns: self.columns, - pk: self.pk, + plan_pk: self.plan_pk, downstream_pk: self.downstream_pk, distribution_key: self.distribution_key, owner, @@ -91,7 +90,7 @@ impl SinkDesc { .iter() .map(|column| Into::::into(&column.column_desc)) .collect_vec(), - pk: self.pk.iter().map(|k| k.to_protobuf()).collect_vec(), + plan_pk: self.plan_pk.iter().map(|k| k.to_protobuf()).collect_vec(), downstream_pk: self.downstream_pk.iter().map(|idx| *idx as _).collect_vec(), distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(), properties: self.properties.clone().into_iter().collect(), diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 6bb6968e0ce31..d221f13b548ab 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -115,9 +115,8 @@ pub struct SinkCatalog { /// All columns of the sink. Note that this is NOT sorted by columnId in the vector. pub columns: Vec, - /// Primiary keys of the sink (connector). Now the sink does not care about a field's - /// order (ASC/DESC). - pub pk: Vec, + /// Primiary keys of the sink. Derived by the frontend. + pub plan_pk: Vec, /// User-defined primary key indices for upsert sink. pub downstream_pk: Vec, @@ -150,7 +149,7 @@ impl SinkCatalog { name: self.name.clone(), definition: self.definition.clone(), columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(), - pk: self.pk.iter().map(|o| o.to_protobuf()).collect(), + plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(), downstream_pk: self .downstream_pk .iter() @@ -200,7 +199,11 @@ impl From for SinkCatalog { .into_iter() .map(ColumnCatalog::from) .collect_vec(), - pk: pb.pk.iter().map(ColumnOrder::from_protobuf).collect_vec(), + plan_pk: pb + .plan_pk + .iter() + .map(ColumnOrder::from_protobuf) + .collect_vec(), downstream_pk: pb.downstream_pk.iter().map(|k| *k as _).collect_vec(), distribution_key: pb.distribution_key.iter().map(|k| *k as _).collect_vec(), properties: pb.properties.clone(), diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index e7545d780ab4f..b4020ad054efb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -126,7 +126,7 @@ impl StreamSink { name, definition, columns, - pk, + plan_pk: pk, downstream_pk, distribution_key, properties: properties.into_inner(), @@ -265,7 +265,7 @@ impl fmt::Display for StreamSink { &IndicesDisplay { indices: &self .sink_desc - .pk + .plan_pk .iter() .map(|k| k.column_index) .collect_vec(), diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index ee7dc0628ba60..e524963187f11 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -37,7 +37,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { let sink_type = SinkType::from_proto(sink_desc.get_sink_type().unwrap()); let mut properties = sink_desc.get_properties().clone(); let pk_indices = sink_desc - .pk + .plan_pk .iter() .map(|pk| pk.column_index as usize) .collect::>(); From 624aeb28964a06e34423474a7530f61557b72562 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 17 Mar 2023 05:44:54 +0000 Subject: [PATCH 05/11] enable sink e2e test --- ci/scripts/e2e-sink-test.sh | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 3e5328aa49ff3..3211b3569f55d 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -88,29 +88,29 @@ cargo make ci-start ci-1cn-1fe echo "--- testing sinks" sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt' -# sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt' +sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt' sleep 1 # check sink destination postgres -# sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt' -# sleep 1 -# sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' -# sleep 1 +sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt' +sleep 1 +sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' +sleep 1 # check sink destination mysql using shell -# if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{ -# if ($1 == 1 && $2 == "Alex") c1++; -# if ($1 == 3 && $2 == "Carl") c2++; -# if ($1 == 4 && $2 == "Doris") c3++; -# if ($1 == 5 && $2 == "Eve") c4++; -# if ($1 == 6 && $2 == "Frank") c5++; } -# END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then -# echo "mysql sink check passed" -# else -# echo "The output is not as expected." -# exit 1 -# fi +if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{ +if ($1 == 1 && $2 == "Alex") c1++; + if ($1 == 3 && $2 == "Carl") c2++; + if ($1 == 4 && $2 == "Doris") c3++; + if ($1 == 5 && $2 == "Eve") c4++; + if ($1 == 6 && $2 == "Frank") c5++; } + END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then + echo "mysql sink check passed" +else + echo "The output is not as expected." + exit 1 +fi echo "--- Kill cluster" pkill -f connector-node From c047965f2aaa71b655892fb96697ccb9601ceaf3 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 17 Mar 2023 05:47:42 +0000 Subject: [PATCH 06/11] add TODO to reuse conductor instance --- src/connector/src/sink/kafka.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 36e9d1460f98b..4354b9c5c8698 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -149,6 +149,7 @@ impl KafkaSink { } // Try Kafka connection. + // TODO: Reuse the conductor instance we create during validation. KafkaTransactionConductor::new(config).await?; Ok(()) From 434c808f5cb9ab7c4d943c975fab1b8d456ed6f0 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 17 Mar 2023 05:51:40 +0000 Subject: [PATCH 07/11] add comment about sink primary key format --- src/frontend/src/optimizer/plan_node/stream_sink.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index b4020ad054efb..87fa8f7a83d21 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -187,6 +187,9 @@ impl StreamSink { /// Extract user-defined downstream pk columns from with options. Return the indices of the pk /// columns. + /// + /// The format of `downstream_pk_str` should be 'col1,col2,...' (delimited by `,`) in order to + /// get parsed. fn parse_downstream_pk( columns: &[ColumnCatalog], downstream_pk_str: Option<&String>, From 3319abeebe5cac5816b36af3ce1bd10f55f63591 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 17 Mar 2023 06:14:02 +0000 Subject: [PATCH 08/11] rename sink.mode to type for iceberg sink --- e2e_test/sink/iceberg_sink.slt | 2 +- integration_tests/iceberg-sink/create_sink.sql | 2 +- .../java/com/risingwave/connector/api/TableSchema.java | 2 -- java/connector-node/python-client/integration_tests.py | 4 ++-- .../com/risingwave/connector/IcebergSinkFactory.java | 10 +++++----- .../risingwave/connector/IcebergSinkFactoryTest.java | 2 +- 6 files changed, 10 insertions(+), 12 deletions(-) diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index ecb91b96fd0f7..144a169f2d49d 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -7,7 +7,7 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; statement ok CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ( connector = 'iceberg', - sink.mode='append-only', + type='append-only', warehouse.path = 's3://iceberg', s3.endpoint = 'http://127.0.0.1:9301', s3.access.key = 'hummockadmin', diff --git a/integration_tests/iceberg-sink/create_sink.sql b/integration_tests/iceberg-sink/create_sink.sql index 8913dbb05525b..408b81505bafa 100644 --- a/integration_tests/iceberg-sink/create_sink.sql +++ b/integration_tests/iceberg-sink/create_sink.sql @@ -2,7 +2,7 @@ CREATE SINK bhv_iceberg_sink FROM bhv_mv WITH ( connector = 'iceberg', - sink.mode='upsert', + type = 'upsert', warehouse.path = 's3://hummock001/iceberg-data', s3.endpoint = 'http://minio-0:9301', s3.access.key = 'hummockadmin', diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java index 5d1016c95ea49..053ba1e329920 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java @@ -102,8 +102,6 @@ public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchem .collect(Collectors.toList())); } - /** @deprecated pk here is from Risingwave, it may not match the pk in the database */ - @Deprecated public List getPrimaryKeys() { return primaryKeys; } diff --git a/java/connector-node/python-client/integration_tests.py b/java/connector-node/python-client/integration_tests.py index 549f54015d0dc..5a3dd932cf795 100644 --- a/java/connector-node/python-client/integration_tests.py +++ b/java/connector-node/python-client/integration_tests.py @@ -160,7 +160,7 @@ def test_print_sink(input_file): def test_iceberg_sink(input_file): test_sink("iceberg", - {"sink.mode":"append-only", + {"type":"append-only", "warehouse.path":"s3a://bucket", "s3.endpoint": "http://127.0.0.1:9000", "s3.access.key": "minioadmin", @@ -171,7 +171,7 @@ def test_iceberg_sink(input_file): def test_upsert_iceberg_sink(input_file): test_upsert_sink("iceberg", - {"sink.mode":"upsert", + {"type":"upsert", "warehouse.path":"s3a://bucket", "s3.endpoint": "http://127.0.0.1:9000", "s3.access.key": "minioadmin", diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java index 424f7e415d58f..1cdb1edf0e468 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java @@ -39,7 +39,7 @@ public class IcebergSinkFactory implements SinkFactory { private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkFactory.class); - public static final String SINK_MODE_PROP = "sink.mode"; + public static final String SINK_TYPE_PROP = "type"; public static final String WAREHOUSE_PATH_PROP = "warehouse.path"; public static final String DATABASE_NAME_PROP = "database.name"; public static final String TABLE_NAME_PROP = "table.name"; @@ -58,7 +58,7 @@ public class IcebergSinkFactory implements SinkFactory { @Override public SinkBase create(TableSchema tableSchema, Map tableProperties) { - String mode = tableProperties.get(SINK_MODE_PROP); + String mode = tableProperties.get(SINK_TYPE_PROP); String warehousePath = getWarehousePath(tableProperties); String databaseName = tableProperties.get(DATABASE_NAME_PROP); String tableName = tableProperties.get(TABLE_NAME_PROP); @@ -93,7 +93,7 @@ public SinkBase create(TableSchema tableSchema, Map tablePropert @Override public void validate( TableSchema tableSchema, Map tableProperties, SinkType sinkType) { - if (!tableProperties.containsKey(SINK_MODE_PROP) // only append-only, upsert + if (!tableProperties.containsKey(SINK_TYPE_PROP) // only append-only, upsert || !tableProperties.containsKey(WAREHOUSE_PATH_PROP) || !tableProperties.containsKey(DATABASE_NAME_PROP) || !tableProperties.containsKey(TABLE_NAME_PROP)) { @@ -101,14 +101,14 @@ public void validate( .withDescription( String.format( "%s, %s, %s or %s is not specified", - SINK_MODE_PROP, + SINK_TYPE_PROP, WAREHOUSE_PATH_PROP, DATABASE_NAME_PROP, TABLE_NAME_PROP)) .asRuntimeException(); } - String mode = tableProperties.get(SINK_MODE_PROP); + String mode = tableProperties.get(SINK_TYPE_PROP); String databaseName = tableProperties.get(DATABASE_NAME_PROP); String tableName = tableProperties.get(TABLE_NAME_PROP); String warehousePath = getWarehousePath(tableProperties); diff --git a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java index 6ff0ce99e38a8..832ac2746f973 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java +++ b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java @@ -64,7 +64,7 @@ public void testCreate() throws IOException { sinkFactory.create( TableSchema.getMockTableSchema(), Map.of( - IcebergSinkFactory.SINK_MODE_PROP, + IcebergSinkFactory.SINK_TYPE_PROP, sinkMode, IcebergSinkFactory.WAREHOUSE_PATH_PROP, warehousePath, From 83c982886dfce078fcf88e547e280cf32ced50c7 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 17 Mar 2023 06:28:50 +0000 Subject: [PATCH 09/11] fix iceberg sink test --- e2e_test/sink/iceberg_sink.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index 144a169f2d49d..1988d00d1df11 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -7,7 +7,7 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; statement ok CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ( connector = 'iceberg', - type='append-only', + type = 'upsert', warehouse.path = 's3://iceberg', s3.endpoint = 'http://127.0.0.1:9301', s3.access.key = 'hummockadmin', From 6fa59cc8b2b0953e26bce55b352a36293de338a2 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 17 Mar 2023 06:44:47 +0000 Subject: [PATCH 10/11] fix iceberg sink test --- e2e_test/sink/iceberg_sink.slt | 1 + 1 file changed, 1 insertion(+) diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index 1988d00d1df11..c0bcb3b62559b 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -8,6 +8,7 @@ statement ok CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ( connector = 'iceberg', type = 'upsert', + primary_key = 'v1', warehouse.path = 's3://iceberg', s3.endpoint = 'http://127.0.0.1:9301', s3.access.key = 'hummockadmin', From d3a4efa1d21fceaca317714ccfcb2210f05c2e8e Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 17 Mar 2023 07:29:26 +0000 Subject: [PATCH 11/11] remove unnecessary code --- src/connector/src/sink/mod.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 6ab5da3431d9e..cb2803c60a8de 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -170,13 +170,9 @@ impl SinkImpl { SinkConfig::Redis(cfg) => RedisSink::new(cfg, sink_catalog.schema()).map(|_| ()), SinkConfig::Kafka(cfg) => { if sink_catalog.sink_type.is_append_only() { - KafkaSink::::validate(*cfg, sink_catalog.downstream_pk_indices()) - .await - .map(|_| ()) + KafkaSink::::validate(*cfg, sink_catalog.downstream_pk_indices()).await } else { - KafkaSink::::validate(*cfg, sink_catalog.downstream_pk_indices()) - .await - .map(|_| ()) + KafkaSink::::validate(*cfg, sink_catalog.downstream_pk_indices()).await } } SinkConfig::Remote(cfg) => {