Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sinking to relational databases via Debezium #235

Merged
merged 9 commits into from
Aug 9, 2023
Merged

Conversation

mwylde
Copy link
Member

@mwylde mwylde commented Aug 8, 2023

While investigating #233, I noticed that while we have support for outputting Debezium-formatted JSON, there were a few issues that prevented us from actually being able to write to a postgres (or other RDBMS) sink via Debezium:

  • Debezium requires a Kafka-connect schema, but we don't support any of the ways of setting that (via avro, confluent schema registry, or as an "embedded" schema in the message)
  • We don't have a way to emit timestamps that can be ingested by Debezium.

This PR addresses both issues, by allowing users to specify that output JSON should have embedded schema (currently only possible in sql, by setting 'json.include_schema' = true on the connection table), and properly emits millisecond-unix timestamp datetimes when the format is "debezium_json." This behavior can be overridden by setting the json.timestamp_format option as well.

As part of addressing this, I also began a redesign of the format system as the existing "serialization_mode" spec was too limiting to express things like "embed_schema." This will support us emitting more formats (like avro and protobuf) in a structured way in the future.

@mwylde mwylde requested a review from jacksonrnewhouse August 8, 2023 03:25
arroyo-api/src/connection_tables.rs Show resolved Hide resolved
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct OperatorConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be here so that it can be shared between arroyo-worker and arroyo-connectors. Is that right? I've put things in arroyo-types for similar reasons, should we standardize on one crate for that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in arroyo-rpc instead of arroyo-types because it needs serde_json::Value which arroyo-types doesn't currently have, and ideally we don't add additional crate dependencies to arroyo-types.

Generally I think arroyo-rpc makes sense for values that are used in communication between the various parts of our system.

@@ -30,7 +30,6 @@ pub struct SqlSink {
#[derive(Clone, Debug)]
pub enum SinkUpdateType {
Allow,
Disallow,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of this was to reject updating SQL queries if the sink did not support updates. With this change, the following query writes debezium formatted records to kafka:

CREATE TABLE kafka_raw_sink (
  sum bigint,
) WITH (
  connector = 'kafka',
  bootstrap_servers = 'localhost:9092',
  type = 'sink',
  topic = 'raw_sink',
  format = 'json'
);
INSERT INTO kafka_raw_sink
SELECT Count(*) FROM nexmark;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've re-added this logic so that we reject queries that insert updates into non-updating sinks, and added a test.

@@ -341,11 +341,14 @@ pub fn parse_and_get_program_sync(
operator: "GrpcSink::<#in_k, #in_t>".to_string(),
config: "{}".to_string(),
description: "WebSink".to_string(),
serialization_mode: if insert.is_updating() {
arroyo_datastream::SerializationMode::DebeziumJson
format: Some(if insert.is_updating() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be cleaner as just

Some(Format::Json(JsonFormat {
debezium: insert.is_updating(),
..Default::default()
})

I think

@@ -150,7 +151,7 @@ impl StructDef {
StructDef { name: None, fields }
}

pub fn generate_record_batch_builder(&self) -> TokenStream {
pub fn generate_serializer_items(&self) -> TokenStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the near future we should standardize how we decide which code needs to be generated in each place. Right now we get there through a number of different pieces of logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

@@ -256,6 +278,7 @@ pub struct StructField {
pub renamed_from: Option<String>,
pub original_type: Option<String>,
pub expression: Option<Box<Expression>>,
pub format: Option<Arc<Format>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like adding this just for the one serialization case, but it does mean that we have different struct hashes, so differently named structs.

Could we put it on the StructDef instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to StructDef

"protobuf" => return Err("protobuf is not yet supported".to_string()),
"avro" => return Err("avro is not yet supported".to_string()),
"raw_string" => return Err("raw_string is not yet supported".to_string()),
"parquet" => Format::Parquet(ParquetFormat {}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't clear to me where the line is between "format" settings and other settings. Should the compression used by parquet be a format setting?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think parquet stuff should be moved in here, but I punted on it for now

@mwylde mwylde force-pushed the debezium_fixes branch 2 times, most recently from eada29a to d58303a Compare August 8, 2023 22:39
@mwylde mwylde marked this pull request as ready for review August 8, 2023 22:39
@mwylde
Copy link
Member Author

mwylde commented Aug 8, 2023

This is now ready for review

@mwylde mwylde enabled auto-merge (squash) August 9, 2023 00:12
@mwylde mwylde merged commit b114097 into master Aug 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants