Skip to content

Commit

Permalink
feat(sink): support upsert-kafka sink (#8168)
Browse files Browse the repository at this point in the history
support upsert kafka
  • Loading branch information
xx01cyx authored Feb 27, 2023
1 parent 3f41e88 commit 1537880
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 29 deletions.
118 changes: 91 additions & 27 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use serde_derive::Deserialize;
use serde_json::{json, Map, Value};
use tracing::warn;

use super::{Sink, SinkError, SINK_FORMAT_APPEND_ONLY, SINK_FORMAT_DEBEZIUM};
use super::{Sink, SinkError, SINK_FORMAT_APPEND_ONLY, SINK_FORMAT_DEBEZIUM, SINK_FORMAT_UPSERT};
use crate::common::KafkaCommon;
use crate::sink::Result;
use crate::{deserialize_bool_from_string, deserialize_duration_from_string};
Expand Down Expand Up @@ -61,7 +61,7 @@ pub struct KafkaConfig {
#[serde(flatten)]
pub common: KafkaCommon,

pub format: String, // accept "append_only" or "debezium"
pub format: String, // accept "append_only", "debezium", or "upsert"

pub identifier: String,

Expand Down Expand Up @@ -94,9 +94,12 @@ impl KafkaConfig {
let config = serde_json::from_value::<KafkaConfig>(serde_json::to_value(values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;

if config.format != SINK_FORMAT_APPEND_ONLY && config.format != SINK_FORMAT_DEBEZIUM {
if config.format != SINK_FORMAT_APPEND_ONLY
&& config.format != SINK_FORMAT_DEBEZIUM
&& config.format != SINK_FORMAT_UPSERT
{
return Err(SinkError::Config(anyhow!(
"format must be either append_only or debezium"
"format must be append_only, debezium, or upsert"
)));
}
Ok(config)
Expand All @@ -115,17 +118,19 @@ pub struct KafkaSink<const APPEND_ONLY: bool> {
pub conductor: KafkaTransactionConductor,
state: KafkaSinkState,
schema: Schema,
pk_indices: Vec<usize>,
in_transaction_epoch: Option<u64>,
}

impl<const APPEND_ONLY: bool> KafkaSink<APPEND_ONLY> {
pub async fn new(config: KafkaConfig, schema: Schema) -> Result<Self> {
pub async fn new(config: KafkaConfig, schema: Schema, pk_indices: Vec<usize>) -> Result<Self> {
Ok(KafkaSink {
config: config.clone(),
conductor: KafkaTransactionConductor::new(config).await?,
in_transaction_epoch: None,
state: KafkaSinkState::Init,
schema,
pk_indices,
})
}

Expand Down Expand Up @@ -181,30 +186,31 @@ impl<const APPEND_ONLY: bool> KafkaSink<APPEND_ONLY> {
)
}

async fn debezium_update(&self, chunk: StreamChunk, schema: &Schema, ts_ms: u64) -> Result<()> {
async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
let mut update_cache: Option<Map<String, Value>> = None;
let schema = &self.schema;
for (op, row) in chunk.rows() {
let event_object = match op {
Op::Insert => Some(json!({
"schema": schema_to_json(schema),
"payload": {
"before": null,
"after": record_to_json(row, schema.fields.clone())?,
"after": record_to_json(row, &schema.fields)?,
"op": "c",
"ts_ms": ts_ms,
}
})),
Op::Delete => Some(json!({
"schema": schema_to_json(schema),
"payload": {
"before": record_to_json(row, schema.fields.clone())?,
"before": record_to_json(row, &schema.fields)?,
"after": null,
"op": "d",
"ts_ms": ts_ms,
}
})),
Op::UpdateDelete => {
update_cache = Some(record_to_json(row, schema.fields.clone())?);
update_cache = Some(record_to_json(row, &schema.fields)?);
continue;
}
Op::UpdateInsert => {
Expand All @@ -213,14 +219,14 @@ impl<const APPEND_ONLY: bool> KafkaSink<APPEND_ONLY> {
"schema": schema_to_json(schema),
"payload": {
"before": before,
"after": record_to_json(row, schema.fields.clone())?,
"after": record_to_json(row, &schema.fields)?,
"op": "u",
"ts_ms": ts_ms,
}
}))
} else {
warn!(
"not found UpdateDelete in prev row, skipping, row_id {:?}",
"not found UpdateDelete in prev row, skipping, row index {:?}",
row.index()
);
continue;
Expand All @@ -239,10 +245,46 @@ impl<const APPEND_ONLY: bool> KafkaSink<APPEND_ONLY> {
Ok(())
}

async fn append_only(&self, chunk: StreamChunk, schema: &Schema) -> Result<()> {
async fn upsert(&self, chunk: StreamChunk) -> Result<()> {
let mut update_cache: Option<Map<String, Value>> = None;
let schema = &self.schema;
for (op, row) in chunk.rows() {
let event_object = match op {
Op::Insert => Some(Value::Object(record_to_json(row, &schema.fields)?)),
Op::Delete => Some(Value::Null),
Op::UpdateDelete => {
update_cache = Some(record_to_json(row, &schema.fields)?);
continue;
}
Op::UpdateInsert => {
if update_cache.take().is_some() {
Some(Value::Object(record_to_json(row, &schema.fields)?))
} else {
warn!(
"not found UpdateDelete in prev row, skipping, row index {:?}",
row.index()
);
continue;
}
}
};
if let Some(obj) = event_object {
let event_key = Value::Object(pk_to_json(row, &schema.fields, &self.pk_indices)?);
self.send(
BaseRecord::to(self.config.common.topic.as_str())
.key(event_key.to_string().as_bytes())
.payload(obj.to_string().as_bytes()),
)
.await?;
}
}
Ok(())
}

async fn append_only(&self, chunk: StreamChunk) -> Result<()> {
for (op, row) in chunk.rows() {
if op == Op::Insert {
let record = Value::Object(record_to_json(row, schema.fields.clone())?).to_string();
let record = Value::Object(record_to_json(row, &self.schema.fields)?).to_string();
self.send(
BaseRecord::to(self.config.common.topic.as_str())
.key(self.gen_message_key().as_bytes())
Expand All @@ -259,18 +301,23 @@ impl<const APPEND_ONLY: bool> KafkaSink<APPEND_ONLY> {
impl<const APPEND_ONLY: bool> Sink for KafkaSink<APPEND_ONLY> {
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
if APPEND_ONLY {
self.append_only(chunk, &self.schema).await
// Append-only
self.append_only(chunk).await
} else {
// TODO: Distinguish "upsert" from "debezium" later.
self.debezium_update(
chunk,
&self.schema,
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
)
.await
// Debezium
if self.config.format == SINK_FORMAT_DEBEZIUM {
self.debezium_update(
chunk,
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
)
.await
} else {
// Upsert
self.upsert(chunk).await
}
}
}

Expand Down Expand Up @@ -397,7 +444,7 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult<Value
Ok(value)
}

fn record_to_json(row: RowRef<'_>, schema: Vec<Field>) -> Result<Map<String, Value>> {
fn record_to_json(row: RowRef<'_>, schema: &[Field]) -> Result<Map<String, Value>> {
let mut mappings = Map::with_capacity(schema.len());
for (field, datum_ref) in schema.iter().zip_eq_fast(row.iter()) {
let key = field.name.clone();
Expand All @@ -408,10 +455,26 @@ fn record_to_json(row: RowRef<'_>, schema: Vec<Field>) -> Result<Map<String, Val
Ok(mappings)
}

fn pk_to_json(
row: RowRef<'_>,
schema: &[Field],
pk_indices: &[usize],
) -> Result<Map<String, Value>> {
let mut mappings = Map::with_capacity(schema.len());
for idx in pk_indices {
let field = &schema[*idx];
let key = field.name.clone();
let value = datum_to_json_object(field, row.datum_at(*idx))
.map_err(|e| SinkError::JsonParse(e.to_string()))?;
mappings.insert(key, value);
}
Ok(mappings)
}

pub fn chunk_to_json(chunk: StreamChunk, schema: &Schema) -> Result<Vec<String>> {
let mut records: Vec<String> = Vec::with_capacity(chunk.capacity());
for (_, row) in chunk.rows() {
let record = Value::Object(record_to_json(row, schema.fields.clone())?);
let record = Value::Object(record_to_json(row, &schema.fields)?);
records.push(record.to_string());
}

Expand Down Expand Up @@ -572,8 +635,9 @@ mod test {
type_name: "".into(),
},
]);
let pk_indices = vec![];
let kafka_config = KafkaConfig::from_hashmap(properties)?;
let mut sink = KafkaSink::<true>::new(kafka_config.clone(), schema)
let mut sink = KafkaSink::<true>::new(kafka_config.clone(), schema, pk_indices)
.await
.unwrap();

Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::ConnectorParams;
pub const SINK_FORMAT_OPTION: &str = "format";
pub const SINK_FORMAT_APPEND_ONLY: &str = "append_only";
pub const SINK_FORMAT_DEBEZIUM: &str = "debezium";
pub const SINK_FORMAT_UPSERT: &str = "upsert";
pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";

#[async_trait]
Expand Down Expand Up @@ -132,10 +133,14 @@ impl SinkImpl {
SinkConfig::Kafka(cfg) => {
if sink_type.is_append_only() {
// Append-only Kafka sink
SinkImpl::Kafka(Box::new(KafkaSink::<true>::new(*cfg, schema).await?))
SinkImpl::Kafka(Box::new(
KafkaSink::<true>::new(*cfg, schema, pk_indices).await?,
))
} else {
// Upsert Kafka sink
SinkImpl::UpsertKafka(Box::new(KafkaSink::<false>::new(*cfg, schema).await?))
SinkImpl::UpsertKafka(Box::new(
KafkaSink::<false>::new(*cfg, schema, pk_indices).await?,
))
}
}
SinkConfig::Console(cfg) => SinkImpl::Console(Box::new(ConsoleSink::new(cfg, schema)?)),
Expand Down

0 comments on commit 1537880

Please sign in to comment.