Skip to content

Commit

Permalink
feat: kafka-upsert with json,avro format (#8111)
Browse files Browse the repository at this point in the history
To support `upsert-kafka` in a manner similar to [how Flink does](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/upsert-kafka/), the key field of a Kafka message is used to indicate the values of the primary key column. If the value field of the message is not empty, the row will be inserted or updated. If the value field is empty, the row will be deleted. This behavior is not tied to any specific row format.

A Kafka connector with the `upsert` property enabled will produce `UpsertMessage`s encoded in bytes, instead of raw Kafka message values, as `SourceMessage`s.

The row formats prefixed with `UPSERT_` are aware that `SourceMessage`s contain not only the Kafka message value field but also the key field as primary columns, and will behave as expected.

Approved-By: waruto210

Co-Authored-By: idx0-dev <124041366+idx0-dev@users.noreply.github.com>
Co-Authored-By: waruto <wmc314@outlook.com>
  • Loading branch information
adevday and waruto210 authored Feb 23, 2023
1 parent dd7fc13 commit f5f8f83
Show file tree
Hide file tree
Showing 20 changed files with 505 additions and 73 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions dashboard/proto/gen/plan_common.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ message StreamSourceInfo {
string proto_message_name = 4;
int32 csv_delimiter = 5;
bool csv_has_header = 6;
string upsert_avro_primary_key = 7;
}

message Source {
Expand Down
2 changes: 2 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,6 @@ enum RowFormatType {
CSV = 7;
NATIVE = 8;
DEBEZIUM_AVRO = 9;
UPSERT_JSON = 10;
UPSERT_AVRO = 11;
}
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ aws-sdk-kinesis = { workspace = true }
aws-sdk-s3 = { workspace = true }
aws-smithy-http = { workspace = true }
aws-types = { workspace = true }
bincode = "1"
byteorder = "1"
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
Expand Down Expand Up @@ -66,7 +67,6 @@ tonic = { version = "0.2", package = "madsim-tonic" }
tracing = "0.1"
url = "2"
urlencoding = "2"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;

use aws_sdk_kinesis::Client as KinesisClient;
use http::Uri;
use rdkafka::ClientConfig;
Expand Down Expand Up @@ -199,3 +201,11 @@ impl KinesisCommon {
Ok(KinesisClient::from_conf(builder.build()))
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct UpsertMessage<'a> {
#[serde(borrow)]
pub primary_key: Cow<'a, [u8]>,
#[serde(borrow)]
pub record: Cow<'a, [u8]>,
}
160 changes: 134 additions & 26 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use url::Url;

use super::schema_resolver::*;
use super::util::{extract_inner_field_schema, from_avro_value};
use crate::common::UpsertMessage;
use crate::impl_common_parser_logic;
use crate::parser::avro::util::avro_field_to_column_desc;
use crate::parser::schema_registry::{extract_schema_id, Client};
Expand All @@ -38,34 +39,59 @@ impl_common_parser_logic!(AvroParser);
#[derive(Debug)]
pub struct AvroParser {
schema: Arc<Schema>,
key_schema: Option<Arc<Schema>>,
schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
rw_columns: Vec<SourceColumnDesc>,
error_ctx: SourceErrorContext,
upsert_primary_key_column_name: Option<String>,
}

#[derive(Debug, Clone)]
pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
pub upsert_primary_key_column_name: Option<String>,
}

impl AvroParserConfig {
pub async fn new(
props: &HashMap<String, String>,
schema_location: &str,
use_schema_registry: bool,
enable_upsert: bool,
upsert_primary_key_column_name: Option<String>,
) -> Result<Self> {
let url = Url::parse(schema_location).map_err(|e| {
InternalError(format!("failed to parse url ({}): {}", schema_location, e))
})?;
let (schema, schema_resolver) = if use_schema_registry {
if use_schema_registry {
let kafka_topic = get_kafka_topic(props)?;
let client = Client::new(url, props)?;
let (schema, resolver) =
ConfluentSchemaResolver::new(format!("{}-value", kafka_topic).as_str(), client)
.await?;
(Arc::new(schema), Some(Arc::new(resolver)))
let resolver = ConfluentSchemaResolver::new(client);

Ok(Self {
schema: resolver
.get_by_subject_name(&format!("{}-value", kafka_topic))
.await?,
key_schema: if enable_upsert {
Some(
resolver
.get_by_subject_name(&format!("{}-key", kafka_topic))
.await?,
)
} else {
None
},
schema_resolver: Some(Arc::new(resolver)),
upsert_primary_key_column_name,
})
} else {
if enable_upsert {
return Err(RwError::from(InternalError(
"avro upsert without schema registry is not supported".to_string(),
)));
}
let schema_content = match url.scheme() {
"file" => read_schema_from_local(url.path()),
"s3" => read_schema_from_s3(&url, props).await,
Expand All @@ -78,12 +104,28 @@ impl AvroParserConfig {
let schema = Schema::parse_str(&schema_content).map_err(|e| {
RwError::from(InternalError(format!("Avro schema parse error {}", e)))
})?;
(Arc::new(schema), None)
};
Ok(Self {
schema,
schema_resolver,
})
Ok(Self {
schema: Arc::new(schema),
key_schema: None,
schema_resolver: None,
upsert_primary_key_column_name: None,
})
}
}

pub fn extract_pks(&self) -> Result<Vec<ColumnDesc>> {
if let Some(Schema::Record { fields, .. }) = self.key_schema.as_deref() {
let mut index = 0;
let fields = fields
.iter()
.map(|field| avro_field_to_column_desc(&field.name, &field.schema, &mut index))
.collect::<Result<Vec<_>>>()?;
Ok(fields)
} else {
Err(RwError::from(InternalError(
"schema invalid, record required".into(),
)))
}
}

pub fn map_to_columns(&self) -> Result<Vec<ColumnDesc>> {
Expand Down Expand Up @@ -112,30 +154,65 @@ impl AvroParser {
) -> Result<Self> {
let AvroParserConfig {
schema,
key_schema,
schema_resolver,
upsert_primary_key_column_name,
} = config;
Ok(Self {
schema,
key_schema,
schema_resolver,
rw_columns,
error_ctx,
upsert_primary_key_column_name,
})
}

/// The presence of a `key_schema` implies that upsert is enabled.
fn is_enable_upsert(&self) -> bool {
self.key_schema.is_some()
}

pub(crate) async fn parse_inner(
&self,
payload: &[u8],
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
enum Op {
Insert,
Delete,
}

let (_payload, op) = if self.is_enable_upsert() {
let msg: UpsertMessage<'_> = bincode::deserialize(payload).map_err(|e| {
RwError::from(ProtocolError(format!(
"extract payload err {:?}, you may need to check the 'upsert' parameter",
e
)))
})?;
if !msg.record.is_empty() {
(msg.record, Op::Insert)
} else {
(msg.primary_key, Op::Delete)
}
} else {
(payload.into(), Op::Insert)
};

// parse payload to avro value
// if use confluent schema, get writer schema from confluent schema registry
let avro_value = if let Some(resolver) = &self.schema_resolver {
let (schema_id, mut raw_payload) = extract_schema_id(payload)?;
let (schema_id, mut raw_payload) = extract_schema_id(&_payload)?;
let writer_schema = resolver.get(schema_id).await?;
from_avro_datum(writer_schema.as_ref(), &mut raw_payload, Some(&self.schema))
let reader_schema = if matches!(op, Op::Delete) {
self.key_schema.as_deref()
} else {
Some(&*self.schema)
};
from_avro_datum(writer_schema.as_ref(), &mut raw_payload, reader_schema)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?
} else {
let mut reader = Reader::with_schema(&self.schema, payload)
let mut reader = Reader::with_schema(&self.schema, payload as &[u8])
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
match reader.next() {
Some(Ok(v)) => v,
Expand All @@ -148,18 +225,14 @@ impl AvroParser {
}
};

// parse the value to rw value
// the avro can be a key or a value
if let Value::Record(fields) = avro_value {
writer.insert(|column| {
let tuple = fields
.iter()
.find(|val| column.name.eq(&val.0))
.ok_or_else(|| {
RwError::from(InternalError(format!(
"no field named {} in avro msg",
column.name
)))
})?;
let fill = |column: &SourceColumnDesc| {
let tuple = match fields.iter().find(|val| column.name.eq(&val.0)) {
None => return Ok(None),
Some(tup) => tup,
};

let field_schema = extract_inner_field_schema(&self.schema, Some(&column.name))?;
from_avro_value(tuple.1.clone(), field_schema).map_err(|e| {
tracing::error!(
Expand All @@ -169,6 +242,41 @@ impl AvroParser {
);
e
})
};
match op {
Op::Insert => writer.insert(fill),
Op::Delete => writer.delete(fill),
}
} else if self.upsert_primary_key_column_name.is_some()
&& matches!(op, Op::Delete)
&& matches!(
avro_value,
Value::Boolean(_)
| Value::String(_)
| Value::Int(_)
| Value::Long(_)
| Value::Float(_)
| Value::Decimal(_)
| Value::Date(_)
| Value::TimestampMillis(_)
| Value::TimestampMicros(_)
| Value::Duration(_)
)
{
writer.delete(|desc| {
if &desc.name != self.upsert_primary_key_column_name.as_ref().unwrap() {
Ok(None)
} else {
from_avro_value(avro_value.clone(), self.key_schema.as_deref().unwrap())
.map_err(|e| {
tracing::error!(
"failed to process value ({}): {}",
String::from_utf8_lossy(payload),
e
);
e
})
}
})
} else {
Err(RwError::from(ProtocolError(
Expand Down Expand Up @@ -255,7 +363,7 @@ mod test {

async fn new_avro_conf_from_local(file_name: &str) -> error::Result<AvroParserConfig> {
let schema_path = "file://".to_owned() + &test_data_path(file_name);
AvroParserConfig::new(&HashMap::new(), schema_path.as_str(), false).await
AvroParserConfig::new(&HashMap::new(), schema_path.as_str(), false, false, None).await
}

async fn new_avro_parser_from_local(file_name: &str) -> error::Result<AvroParser> {
Expand Down
Loading

0 comments on commit f5f8f83

Please sign in to comment.