From 15378805d21d426f2e7451b5bd37193da041ab47 Mon Sep 17 00:00:00 2001 From: Yuanxin Cao <60498509+xx01cyx@users.noreply.github.com> Date: Mon, 27 Feb 2023 17:20:34 +0800 Subject: [PATCH] feat(sink): support upsert-kafka sink (#8168) support upsert kafka --- src/connector/src/sink/kafka.rs | 118 ++++++++++++++++++++++++-------- src/connector/src/sink/mod.rs | 9 ++- 2 files changed, 98 insertions(+), 29 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index b6063225c1ecc..0a429e4ce7bb8 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -33,7 +33,7 @@ use serde_derive::Deserialize; use serde_json::{json, Map, Value}; use tracing::warn; -use super::{Sink, SinkError, SINK_FORMAT_APPEND_ONLY, SINK_FORMAT_DEBEZIUM}; +use super::{Sink, SinkError, SINK_FORMAT_APPEND_ONLY, SINK_FORMAT_DEBEZIUM, SINK_FORMAT_UPSERT}; use crate::common::KafkaCommon; use crate::sink::Result; use crate::{deserialize_bool_from_string, deserialize_duration_from_string}; @@ -61,7 +61,7 @@ pub struct KafkaConfig { #[serde(flatten)] pub common: KafkaCommon, - pub format: String, // accept "append_only" or "debezium" + pub format: String, // accept "append_only", "debezium", or "upsert" pub identifier: String, @@ -94,9 +94,12 @@ impl KafkaConfig { let config = serde_json::from_value::(serde_json::to_value(values).unwrap()) .map_err(|e| SinkError::Config(anyhow!(e)))?; - if config.format != SINK_FORMAT_APPEND_ONLY && config.format != SINK_FORMAT_DEBEZIUM { + if config.format != SINK_FORMAT_APPEND_ONLY + && config.format != SINK_FORMAT_DEBEZIUM + && config.format != SINK_FORMAT_UPSERT + { return Err(SinkError::Config(anyhow!( - "format must be either append_only or debezium" + "format must be append_only, debezium, or upsert" ))); } Ok(config) @@ -115,17 +118,19 @@ pub struct KafkaSink { pub conductor: KafkaTransactionConductor, state: KafkaSinkState, schema: Schema, + pk_indices: Vec, in_transaction_epoch: Option, } impl KafkaSink { - pub async fn new(config: KafkaConfig, schema: Schema) -> Result { + pub async fn new(config: KafkaConfig, schema: Schema, pk_indices: Vec) -> Result { Ok(KafkaSink { config: config.clone(), conductor: KafkaTransactionConductor::new(config).await?, in_transaction_epoch: None, state: KafkaSinkState::Init, schema, + pk_indices, }) } @@ -181,15 +186,16 @@ impl KafkaSink { ) } - async fn debezium_update(&self, chunk: StreamChunk, schema: &Schema, ts_ms: u64) -> Result<()> { + async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { let mut update_cache: Option> = None; + let schema = &self.schema; for (op, row) in chunk.rows() { let event_object = match op { Op::Insert => Some(json!({ "schema": schema_to_json(schema), "payload": { "before": null, - "after": record_to_json(row, schema.fields.clone())?, + "after": record_to_json(row, &schema.fields)?, "op": "c", "ts_ms": ts_ms, } @@ -197,14 +203,14 @@ impl KafkaSink { Op::Delete => Some(json!({ "schema": schema_to_json(schema), "payload": { - "before": record_to_json(row, schema.fields.clone())?, + "before": record_to_json(row, &schema.fields)?, "after": null, "op": "d", "ts_ms": ts_ms, } })), Op::UpdateDelete => { - update_cache = Some(record_to_json(row, schema.fields.clone())?); + update_cache = Some(record_to_json(row, &schema.fields)?); continue; } Op::UpdateInsert => { @@ -213,14 +219,14 @@ impl KafkaSink { "schema": schema_to_json(schema), "payload": { "before": before, - "after": record_to_json(row, schema.fields.clone())?, + "after": record_to_json(row, &schema.fields)?, "op": "u", "ts_ms": ts_ms, } })) } else { warn!( - "not found UpdateDelete in prev row, skipping, row_id {:?}", + "not found UpdateDelete in prev row, skipping, row index {:?}", row.index() ); continue; @@ -239,10 +245,46 @@ impl KafkaSink { Ok(()) } - async fn append_only(&self, chunk: StreamChunk, schema: &Schema) -> Result<()> { + async fn upsert(&self, chunk: StreamChunk) -> Result<()> { + let mut update_cache: Option> = None; + let schema = &self.schema; + for (op, row) in chunk.rows() { + let event_object = match op { + Op::Insert => Some(Value::Object(record_to_json(row, &schema.fields)?)), + Op::Delete => Some(Value::Null), + Op::UpdateDelete => { + update_cache = Some(record_to_json(row, &schema.fields)?); + continue; + } + Op::UpdateInsert => { + if update_cache.take().is_some() { + Some(Value::Object(record_to_json(row, &schema.fields)?)) + } else { + warn!( + "not found UpdateDelete in prev row, skipping, row index {:?}", + row.index() + ); + continue; + } + } + }; + if let Some(obj) = event_object { + let event_key = Value::Object(pk_to_json(row, &schema.fields, &self.pk_indices)?); + self.send( + BaseRecord::to(self.config.common.topic.as_str()) + .key(event_key.to_string().as_bytes()) + .payload(obj.to_string().as_bytes()), + ) + .await?; + } + } + Ok(()) + } + + async fn append_only(&self, chunk: StreamChunk) -> Result<()> { for (op, row) in chunk.rows() { if op == Op::Insert { - let record = Value::Object(record_to_json(row, schema.fields.clone())?).to_string(); + let record = Value::Object(record_to_json(row, &self.schema.fields)?).to_string(); self.send( BaseRecord::to(self.config.common.topic.as_str()) .key(self.gen_message_key().as_bytes()) @@ -259,18 +301,23 @@ impl KafkaSink { impl Sink for KafkaSink { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { if APPEND_ONLY { - self.append_only(chunk, &self.schema).await + // Append-only + self.append_only(chunk).await } else { - // TODO: Distinguish "upsert" from "debezium" later. - self.debezium_update( - chunk, - &self.schema, - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as u64, - ) - .await + // Debezium + if self.config.format == SINK_FORMAT_DEBEZIUM { + self.debezium_update( + chunk, + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + ) + .await + } else { + // Upsert + self.upsert(chunk).await + } } } @@ -397,7 +444,7 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult, schema: Vec) -> Result> { +fn record_to_json(row: RowRef<'_>, schema: &[Field]) -> Result> { let mut mappings = Map::with_capacity(schema.len()); for (field, datum_ref) in schema.iter().zip_eq_fast(row.iter()) { let key = field.name.clone(); @@ -408,10 +455,26 @@ fn record_to_json(row: RowRef<'_>, schema: Vec) -> Result, + schema: &[Field], + pk_indices: &[usize], +) -> Result> { + let mut mappings = Map::with_capacity(schema.len()); + for idx in pk_indices { + let field = &schema[*idx]; + let key = field.name.clone(); + let value = datum_to_json_object(field, row.datum_at(*idx)) + .map_err(|e| SinkError::JsonParse(e.to_string()))?; + mappings.insert(key, value); + } + Ok(mappings) +} + pub fn chunk_to_json(chunk: StreamChunk, schema: &Schema) -> Result> { let mut records: Vec = Vec::with_capacity(chunk.capacity()); for (_, row) in chunk.rows() { - let record = Value::Object(record_to_json(row, schema.fields.clone())?); + let record = Value::Object(record_to_json(row, &schema.fields)?); records.push(record.to_string()); } @@ -572,8 +635,9 @@ mod test { type_name: "".into(), }, ]); + let pk_indices = vec![]; let kafka_config = KafkaConfig::from_hashmap(properties)?; - let mut sink = KafkaSink::::new(kafka_config.clone(), schema) + let mut sink = KafkaSink::::new(kafka_config.clone(), schema, pk_indices) .await .unwrap(); diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 6cfb9266689d8..012d3fce5d278 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -41,6 +41,7 @@ use crate::ConnectorParams; pub const SINK_FORMAT_OPTION: &str = "format"; pub const SINK_FORMAT_APPEND_ONLY: &str = "append_only"; pub const SINK_FORMAT_DEBEZIUM: &str = "debezium"; +pub const SINK_FORMAT_UPSERT: &str = "upsert"; pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only"; #[async_trait] @@ -132,10 +133,14 @@ impl SinkImpl { SinkConfig::Kafka(cfg) => { if sink_type.is_append_only() { // Append-only Kafka sink - SinkImpl::Kafka(Box::new(KafkaSink::::new(*cfg, schema).await?)) + SinkImpl::Kafka(Box::new( + KafkaSink::::new(*cfg, schema, pk_indices).await?, + )) } else { // Upsert Kafka sink - SinkImpl::UpsertKafka(Box::new(KafkaSink::::new(*cfg, schema).await?)) + SinkImpl::UpsertKafka(Box::new( + KafkaSink::::new(*cfg, schema, pk_indices).await?, + )) } } SinkConfig::Console(cfg) => SinkImpl::Console(Box::new(ConsoleSink::new(cfg, schema)?)),