From 53a1f0c6c14190c37c0b28320db049f822436a4f Mon Sep 17 00:00:00 2001 From: Xiaojian Sun Date: Tue, 25 Jul 2023 10:09:24 +0800 Subject: [PATCH] [Feature][connector][kafka] Support read debezium format message from kafka (#5066) --- .github/workflows/backend.yml | 2 +- docs/en/connector-v2/formats/debezium-json.md | 107 +++++ docs/en/connector-v2/sink/Kafka.md | 16 +- docs/en/connector-v2/source/kafka.md | 17 +- release-note.md | 12 +- .../seatunnel/kafka/config/Config.java | 8 +- .../seatunnel/kafka/config/MessageFormat.java | 1 + .../DefaultSeaTunnelRowSerializer.java | 3 + .../seatunnel/kafka/source/KafkaSource.java | 10 + .../kafka/source/KafkaSourceFactory.java | 1 + .../connector/kafka/DebeziumToKafkaIT.java | 418 ++++++++++++++++++ .../resources/debezium/register-mysql.json | 16 + .../kafkasource_debezium_cdc_to_pgsql.conf | 62 +++ .../kafkasource_debezium_to_kafka.conf | 57 +++ .../format/json/JsonFormatOptions.java | 13 +- .../DebeziumJsonDeserializationSchema.java | 168 +++++++ .../debezium/DebeziumJsonFormatFactory.java | 70 +++ .../debezium/DebeziumJsonFormatOptions.java | 53 +++ .../DebeziumJsonSerializationSchema.java | 80 ++++ ...apache.seatunnel.api.table.factory.Factory | 1 + .../debezium/DebeziumJsonSerDeSchemaTest.java | 163 +++++++ .../src/test/resources/debezium-data.txt | 16 + 22 files changed, 1270 insertions(+), 24 deletions(-) create mode 100644 docs/en/connector-v2/formats/debezium-json.md create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf create mode 100644 seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java create mode 100644 seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatFactory.java create mode 100644 seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java create mode 100644 seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java create mode 100644 seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java create mode 100644 seatunnel-formats/seatunnel-format-json/src/test/resources/debezium-data.txt diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index fbe37acece5..6da4f4a5ab6 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -564,7 +564,7 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 90 + timeout-minutes: 150 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} diff --git a/docs/en/connector-v2/formats/debezium-json.md b/docs/en/connector-v2/formats/debezium-json.md new file mode 100644 index 00000000000..4c40a0298e4 --- /dev/null +++ b/docs/en/connector-v2/formats/debezium-json.md @@ -0,0 +1,107 @@ +# Debezium Format + +Changelog-Data-Capture Format: Serialization Schema Format: Deserialization Schema + +Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a *change event stream*, and applications simply read these streams to see the change events in the same order in which they occurred. + +Seatunnel supports to interpret Debezium JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as + + synchronizing incremental data from databases to other systems + auditing logs + real-time materialized views on databases + temporal join changing history of a database table and so on. + +Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel asDebezium JSON messages, and emit to storage like Kafka. + +# Format Options + +| option | default | required | Description | +|-----------------------------------|---------|----------|------------------------------------------------------------------------------------------------------| +| format | (none) | yes | Specify what format to use, here should be 'debezium_json'. | +| debezium-json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. | + +# How to use Debezium format + +## Kafka uses example + +Debezium provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table: + +```bash +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter ", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter ", + "weight": 5.17 + }, + "source": { + "version": "1.1.1.Final", + "connector": "mysql", + "name": "dbserver1", + "ts_ms": 1589362330000, + "snapshot": "false", + "db": "inventory", + "table": "products", + "server_id": 223344, + "gtid": null, + "file": "mysql-bin.000003", + "pos": 2090, + "row": 0, + "thread": 2, + "query": null + }, + "op": "u", + "ts_ms": 1589362330904, + "transaction": null +} +``` + +Note: please refer to Debezium documentation about the meaning of each fields. + +The MySQL products table has 4 columns (id, name, description and weight). +The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15. +Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel conf to consume this topic and interpret the change events by Debezium format. + +```bash +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9092" + topic = "products_binlog" + result_table_name = "kafka_name" + start_mode = earliest + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "string" + } + } + format = debezium_json + } + +} + +transform { +} + +sink { + Kafka { + bootstrap.servers = "kafkaCluster:9092" + topic = "consume-binlog" + format = debezium_json + } +} +``` + diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md index 4dbd3a84ce7..f971e5390b0 100644 --- a/docs/en/connector-v2/sink/Kafka.md +++ b/docs/en/connector-v2/sink/Kafka.md @@ -108,8 +108,10 @@ Kafka distinguishes different transactions by different transactionId. This para ### format -Data format. The default format is json. Optional text format. The default field separator is ",". -If you customize the delimiter, add the "field_delimiter" option. +Data format. The default format is json. Optional text format, canal-json and debezium-json. +If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option. +If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details. +If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. ### field_delimiter @@ -209,8 +211,10 @@ sink { ### next version -- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/seatunnel/pull/3230) -- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/seatunnel/pull/3711) -- [Improve] Support extract topic from SeaTunnelRow fields [3742](https://github.com/apache/seatunnel/pull/3742) -- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719) +- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230) +- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/incubator-seatunnel/pull/3711) +- [Improve] Support extract topic from SeaTunnelRow fields [3742](https://github.com/apache/incubator-seatunnel/pull/3742) +- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719) +- [Improve] Support read canal format message [3950](https://github.com/apache/incubator-seatunnel/pull/3950) +- [Improve] Support read debezium format message [3981](https://github.com/apache/incubator-seatunnel/pull/3981) diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 06f60af6d87..2ed6ec6f12e 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -73,8 +73,10 @@ The structure of the data, including field names and field types. ## format -Data format. The default format is json. Optional text format. The default field separator is ", ". -If you customize the delimiter, add the "field_delimiter" option. +Data format. The default format is json. Optional text format, canal-json and debezium-json. +If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option. +If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details. +If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. ## format_error_handle_way @@ -221,9 +223,10 @@ source { ### Next Version -- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/seatunnel/pull/3157)) -- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/seatunnel/pull/3125)) -- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719) -- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/seatunnel/pull/3810)) -- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/seatunnel/pull/4364)) +- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157)) +- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125)) +- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719) +- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810)) +- [Improve] Support read canal format message [3950](https://github.com/apache/incubator-seatunnel/pull/3950) +- [Improve] Support read debezium format message [3981](https://github.com/apache/incubator-seatunnel/pull/3981) diff --git a/release-note.md b/release-note.md index 68d14e609f3..0e84da433c2 100644 --- a/release-note.md +++ b/release-note.md @@ -3,9 +3,19 @@ ## Bug fix ### Core - - [Core] [API] Fixed generic class loss for lists (#4421) - [Core] [API] Fix parse nested row data type key changed upper (#4459) +- [Starter][Flink]Support transform-v2 for flink #3396 +- [Flink] Support flink 1.14.x #3963 +### Transformer +- [Spark] Support transform-v2 for spark (#3409) +- [ALL]Add FieldMapper Transform #3781 +### Connectors +- [Elasticsearch] Support https protocol & compatible with opensearch +- [Hbase] Add hbase sink connector #4049 +### Formats +- [Canal]Support read canal format message #3950 +- [Debezium]Support debezium canal format message #3981 ### Connector-V2 diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java index 2dffda4f48a..f126e563fbb 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java @@ -26,8 +26,6 @@ public class Config { public static final String CONNECTOR_IDENTITY = "Kafka"; - public static final String REPLICATION_FACTOR = "replication.factor"; - /** The default field delimiter is “,” */ public static final String DEFAULT_FIELD_DELIMITER = ","; @@ -99,6 +97,12 @@ public class Config { "Data format. The default format is json. Optional text format. The default field separator is \", \". " + "If you customize the delimiter, add the \"field_delimiter\" option."); + public static final Option DEBEZIUM_RECORD_INCLUDE_SCHEMA = + Options.key("debezium_record_include_schema") + .booleanType() + .defaultValue(true) + .withDescription("Does the debezium record carry a schema."); + public static final Option FIELD_DELIMITER = Options.key("field_delimiter") .stringType() diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java index 65b5cc27699..1ef29f6322a 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java @@ -21,5 +21,6 @@ public enum MessageFormat { JSON, TEXT, CANAL_JSON, + DEBEZIUM_JSON, COMPATIBLE_DEBEZIUM_JSON } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java index 06005de0035..f8974d0f1a9 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema; import org.apache.seatunnel.format.json.JsonSerializationSchema; import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema; +import org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema; import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; import org.apache.seatunnel.format.text.TextSerializationSchema; @@ -219,6 +220,8 @@ private static SerializationSchema createSerializationSchema( .build(); case CANAL_JSON: return new CanalJsonSerializationSchema(rowType); + case DEBEZIUM_JSON: + return new DebeziumJsonSerializationSchema(rowType); case COMPATIBLE_DEBEZIUM_JSON: return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey); default: diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index 741d7521643..30878e82a2c 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -47,6 +47,7 @@ import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState; import org.apache.seatunnel.format.json.JsonDeserializationSchema; import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema; +import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema; import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; import org.apache.seatunnel.format.text.TextDeserializationSchema; import org.apache.seatunnel.format.text.constant.TextFormatConstant; @@ -62,6 +63,7 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT; @@ -266,6 +268,14 @@ private void setDeserialization(Config config) { .setIgnoreParseErrors(true) .build(); break; + case DEBEZIUM_JSON: + boolean includeSchema = DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue(); + if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) { + includeSchema = config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key()); + } + deserializationSchema = + new DebeziumJsonDeserializationSchema(typeInfo, true, includeSchema); + break; default: throw new SeaTunnelJsonFormatException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format); diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java index daa75385e4d..21057040ec2 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java @@ -46,6 +46,7 @@ public OptionRule optionRule() { Config.KAFKA_CONFIG, Config.SCHEMA, Config.FORMAT, + Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA, Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS) .conditional(Config.START_MODE, StartMode.TIMESTAMP, Config.START_MODE_TIMESTAMP) .conditional( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java new file mode 100644 index 00000000000..e76a4459963 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.kafka; + +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.given; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + +@DisabledOnContainer( + value = {}, + type = {EngineType.SEATUNNEL, EngineType.SPARK}) +@Slf4j +public class DebeziumToKafkaIT extends TestSuiteBase implements TestResource { + + private static final Logger LOG = LoggerFactory.getLogger(DebeziumToKafkaIT.class); + + private static GenericContainer DEBEZIUM_CONTAINER; + + private static final String DEBEZIUM_DOCKER_IMAGE = "quay.io/debezium/connect:2.3.0.Final"; + + private static final String DEBEZIUM_HOST = "debezium_e2e"; + + private static final int DEBEZIUM_PORT = 8083; + + // ----------------------------------------kafka------------------------------------ + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9"; + private static final String KAFKA_HOST = "kafka_dbz_e2e"; + private KafkaConsumer kafkaConsumer; + private KafkaContainer KAFKA_CONTAINER; + private String KAFKA_TOPIC = "test-debezium-sink"; + + // -------------------------------------mysql--------------------------------------- + private static final String MYSQL_HOST = "mysql"; + private static MySqlContainer MYSQL_CONTAINER; + + // -----------------------------------------postgres----------------------------------- + private static final String PG_IMAGE = "postgres:alpine3.16"; + + private static final int PG_PORT = 5432; + + private static final String PG_DRIVER_JAR = + "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar"; + + private static PostgreSQLContainer POSTGRESQL_CONTAINER; + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + + PG_DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + + private void createDebeziumContainer() { + DEBEZIUM_CONTAINER = + new GenericContainer<>(DEBEZIUM_DOCKER_IMAGE) + .withCopyFileToContainer( + MountableFile.forClasspathResource("/debezium/register-mysql.json"), + "/tmp/seatunnel/plugins/Jdbc/register-mysql.json") + .withNetwork(NETWORK) + .withNetworkAliases(DEBEZIUM_HOST) + .withExposedPorts(DEBEZIUM_PORT) + .withEnv("GROUP_ID", "1") + .withEnv("CONFIG_STORAGE_TOPIC", "my-connect-configs") + .withEnv("OFFSET_STORAGE_TOPIC", "my-connect-offsets") + .withEnv("STATUS_STORAGE_TOPIC", "my-connect-status") + .withEnv("BOOTSTRAP_SERVERS", KAFKA_HOST + ":9092") + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(DEBEZIUM_DOCKER_IMAGE))) + .dependsOn(KAFKA_CONTAINER, MYSQL_CONTAINER); + DEBEZIUM_CONTAINER.setWaitStrategy( + (new HttpWaitStrategy()) + .forPath("/connectors") + .forPort(DEBEZIUM_PORT) + .withStartupTimeout(Duration.ofSeconds(120))); + DEBEZIUM_CONTAINER.setPortBindings( + com.google.common.collect.Lists.newArrayList( + String.format("%s:%s", DEBEZIUM_PORT, DEBEZIUM_PORT))); + } + + private void createKafkaContainer() { + KAFKA_CONTAINER = + new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); + } + + private void createMysqlContainer() { + MYSQL_CONTAINER = + new MySqlContainer(MySqlVersion.V8_0) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName("debezium") + .withUsername("st_user") + .withPassword("seatunnel") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } + + private void createPostgreSQLContainer() { + POSTGRESQL_CONTAINER = + new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE)) + .withNetwork(NETWORK) + .withNetworkAliases("postgresql_e2e") + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE))); + POSTGRESQL_CONTAINER.setPortBindings( + Lists.newArrayList(String.format("%s:%s", PG_PORT, PG_PORT))); + } + + @BeforeAll + @Override + public void startUp() throws Exception { + LOG.info("The first stage: Starting Kafka containers..."); + createKafkaContainer(); + Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join(); + + LOG.info("The second stage: Starting Mysql containers..."); + createMysqlContainer(); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + + LOG.info("The third stage: Starting Debezium Connector containers..."); + createDebeziumContainer(); + Startables.deepStart(Stream.of(DEBEZIUM_CONTAINER)).join(); + + LOG.info("The fourth stage: Starting PostgreSQL container..."); + createPostgreSQLContainer(); + Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join(); + Class.forName(POSTGRESQL_CONTAINER.getDriverClassName()); + + Awaitility.given() + .ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(3, TimeUnit.MINUTES) + .untilAsserted(this::initializeSourceTableData); + + given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(3, TimeUnit.MINUTES) + .untilAsserted(this::initKafkaConsumer); + + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.MINUTES) + .untilAsserted(this::initializeSinkJdbcTable); + + Container.ExecResult extraCommand = + DEBEZIUM_CONTAINER.execInContainer( + "bash", + "-c", + "cd /tmp/seatunnel/plugins/Jdbc && curl -i -X POST -H \"Accept:application/json\" -H \"Content-Type:application/json\" http://" + + getLinuxLocalIp() + + ":8083/connectors/ -d @register-mysql.json"); + Assertions.assertEquals(0, extraCommand.getExitCode()); + // ensure debezium has handled the data + Thread.sleep(30 * 1000); + updateSourceTableData(); + Thread.sleep(30 * 1000); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + MYSQL_CONTAINER.close(); + KAFKA_CONTAINER.close(); + DEBEZIUM_CONTAINER.close(); + POSTGRESQL_CONTAINER.close(); + } + + @TestTemplate + public void testKafkaSinkDebeziumFormat(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafkasource_debezium_to_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + ArrayList result = new ArrayList<>(); + kafkaConsumer.subscribe(Lists.newArrayList(KAFKA_TOPIC)); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + ConsumerRecords consumerRecords = + kafkaConsumer.poll(Duration.ofMillis(1000)); + for (ConsumerRecord record : consumerRecords) { + result.add(record.value()); + } + Assertions.assertEquals(12, result.size()); + }); + } + + @TestTemplate + public void testDebeziumFormatKafkaCdcToPgsql(TestContainer container) + throws IOException, InterruptedException, SQLException { + Container.ExecResult execResult = + container.executeJob("/kafkasource_debezium_cdc_to_pgsql.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + Set> actual = new HashSet<>(); + try (Connection connection = + DriverManager.getConnection( + POSTGRESQL_CONTAINER.getJdbcUrl(), + POSTGRESQL_CONTAINER.getUsername(), + POSTGRESQL_CONTAINER.getPassword())) { + try (Statement statement = connection.createStatement()) { + ResultSet resultSet = statement.executeQuery("select * from sink order by id"); + while (resultSet.next()) { + List row = + Arrays.asList( + resultSet.getInt("id"), + resultSet.getString("name"), + resultSet.getString("description"), + resultSet.getString("weight")); + actual.add(row); + } + } + } + Set> expected = + Stream.>of( + Arrays.asList(101, "scooter", "Small 2-wheel scooter", "4.56"), + Arrays.asList(102, "car battery", "12V car battery", "8.1"), + Arrays.asList( + 103, + "12-pack drill bits", + "12-pack of drill bits with sizes ranging from #40 to #3", + "0.8"), + Arrays.asList(104, "hammer", "12oz carpenter's hammer", "0.75"), + Arrays.asList(105, "hammer", "14oz carpenter's hammer", "0.875"), + Arrays.asList(106, "hammer", "16oz carpenter's hammer", "1"), + Arrays.asList(107, "rocks", "box of assorted rocks", "5.3"), + Arrays.asList( + 108, "jacket", "water resistent black wind breaker", "0.1")) + .collect(Collectors.toSet()); + Assertions.assertIterableEquals(expected, actual); + } + + public void initializeSourceTableData() throws Exception { + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + Statement statement = connection.createStatement()) { + statement.execute("create database if not exists debezium"); + statement.execute( + "CREATE TABLE if not exists debezium.products (\n" + + " id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n" + + " name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',\n" + + " description VARCHAR(512),\n" + + " weight VARCHAR(512)\n" + + ");"); + statement.execute( + "INSERT INTO debezium.products\n" + + "VALUES (101,\"scooter\",\"Small 2-wheel scooter\",\"3.14\"),\n" + + " (102,\"car battery\",\"12V car battery\",\"8.1\"),\n" + + " (103,\"12-pack drill bits\",\"12-pack of drill bits with sizes ranging from #40 to #3\"," + + "\"0.8\"),\n" + + " (104,\"hammer\",\"12oz carpenter's hammer\",\"0.75\"),\n" + + " (105,\"hammer\",\"14oz carpenter's hammer\",\"0.875\"),\n" + + " (106,\"hammer\",\"16oz carpenter's hammer\",\"1.0\"),\n" + + " (107,\"rocks\",\"box of assorted rocks\",\"5.3\"),\n" + + " (108,\"jacket\",\"water resistent black wind breaker\",\"0.1\"),\n" + + " (109,\"spare tire\",\"24 inch spare tire\",\"22.2\")"); + } + } + + public void updateSourceTableData() throws Exception { + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE debezium.products SET weight = '4.56' WHERE name = 'scooter'"); + statement.execute("DELETE FROM debezium.products WHERE name = \"spare tire\""); + } + } + + private void initializeSinkJdbcTable() { + try (Connection connection = + DriverManager.getConnection( + POSTGRESQL_CONTAINER.getJdbcUrl(), + POSTGRESQL_CONTAINER.getUsername(), + POSTGRESQL_CONTAINER.getPassword()); + Statement statement = connection.createStatement()) { + String sink = + "create table sink(\n" + + "id INT NOT NULL PRIMARY KEY,\n" + + "name varchar(255),\n" + + "description varchar(255),\n" + + "weight varchar(255)" + + ")"; + statement.execute(sink); + } catch (SQLException e) { + throw new RuntimeException("Initializing PostgreSql table failed!", e); + } + } + + private void initKafkaConsumer() { + Properties prop = new Properties(); + String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers(); + prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + prop.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + prop.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + prop.put(ConsumerConfig.GROUP_ID_CONFIG, "seatunnel-debezium-sink-group"); + kafkaConsumer = new KafkaConsumer<>(prop); + } + + public String getLinuxLocalIp() { + String ip = ""; + try { + Enumeration networkInterfaces = + NetworkInterface.getNetworkInterfaces(); + while (networkInterfaces.hasMoreElements()) { + NetworkInterface networkInterface = networkInterfaces.nextElement(); + Enumeration inetAddresses = networkInterface.getInetAddresses(); + while (inetAddresses.hasMoreElements()) { + InetAddress inetAddress = inetAddresses.nextElement(); + if (!inetAddress.isLoopbackAddress() && inetAddress instanceof Inet4Address) { + ip = inetAddress.getHostAddress(); + } + } + } + } catch (SocketException ex) { + log.warn("Failed to get linux local ip, it will return [\"\"] ", ex); + } + return ip; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json new file mode 100644 index 00000000000..d70e8e0c613 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json @@ -0,0 +1,16 @@ +{ + "name": "inventory-connector", + "config": { + "connector.class": "io.debezium.connector.mysql.MySqlConnector", + "tasks.max": "1", + "database.hostname": "mysql", + "database.port": "3306", + "database.user": "st_user", + "database.password": "seatunnel", + "database.server.id": "184054", + "topic.prefix": "dbserver1", + "database.include.list": "debezium", + "schema.history.internal.kafka.bootstrap.servers": "kafka_dbz_e2e:9092", + "schema.history.internal.kafka.topic": "schema-changes.debezium" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf new file mode 100644 index 00000000000..a0531b2345a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + execution.parallelism = 1 + job.mode = "BATCH" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Kafka { + bootstrap.servers = "kafka_dbz_e2e:9092" + topic = "dbserver1.debezium.products" + result_table_name = "kafka_name" + start_mode = earliest + format = debezium_json + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "float" + } + } + } +} + +sink { + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://postgresql_e2e:5432/test?loggerLevel=OFF" + user = test + password = test + generate_sink_sql = true + database = public + table = sink + primary_keys = ["id"] + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf new file mode 100644 index 00000000000..4944829c24a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + execution.parallelism = 1 + job.mode = "BATCH" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Kafka { + bootstrap.servers = "kafka_dbz_e2e:9092" + topic = "dbserver1.debezium.products" + result_table_name = "kafka_name" + start_mode = earliest + format = debezium_json + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "float" + } + } + } +} + +sink { + Kafka { + bootstrap.servers = "kafka_dbz_e2e:9092" + topic = "test-debezium-sink" + format = debezium_json + } +} \ No newline at end of file diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java index 7b10ad57a63..9ce4dc55411 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatOptions.java @@ -24,6 +24,12 @@ import java.util.Map; public class JsonFormatOptions { + public static final Option FAIL_ON_MISSING_FIELD = + Options.key("fail-on-missing-field") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to specify whether to fail if a field is missing or not, false by default."); public static final Option IGNORE_PARSE_ERRORS = Options.key("ignore-parse-errors") @@ -33,13 +39,6 @@ public class JsonFormatOptions { "Optional flag to skip fields and rows with parse errors instead of failing;\n" + "fields are set to null in case of errors, false by default."); - public static final Option FAIL_ON_MISSING_FIELD = - Options.key("fail-on-missing-field") - .booleanType() - .defaultValue(false) - .withDescription( - "Optional flag to specify whether to fail if a field is missing or not, false by default."); - public static boolean getFailOnMissingField(Map options) { return Boolean.parseBoolean( options.getOrDefault( diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java new file mode 100644 index 00000000000..3996c4ed7d8 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.json.debezium; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; + +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.format.json.JsonDeserializationSchema; +import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; + +import java.io.IOException; + +public class DebeziumJsonDeserializationSchema implements DeserializationSchema { + private static final long serialVersionUID = 1L; + + private static final String OP_READ = "r"; // snapshot read + private static final String OP_CREATE = "c"; // insert + private static final String OP_UPDATE = "u"; // update + private static final String OP_DELETE = "d"; // delete + + private static final String REPLICA_IDENTITY_EXCEPTION = + "The \"before\" field of %s message is null, " + + "if you are using Debezium Postgres Connector, " + + "please check the Postgres table has been set REPLICA IDENTITY to FULL level."; + + private final SeaTunnelRowType rowType; + + private final JsonDeserializationSchema jsonDeserializer; + + private final boolean ignoreParseErrors; + + private final boolean debeziumEnabledSchema; + + public DebeziumJsonDeserializationSchema(SeaTunnelRowType rowType, boolean ignoreParseErrors) { + this.rowType = rowType; + this.ignoreParseErrors = ignoreParseErrors; + this.jsonDeserializer = + new JsonDeserializationSchema(false, ignoreParseErrors, createJsonRowType(rowType)); + this.debeziumEnabledSchema = false; + } + + public DebeziumJsonDeserializationSchema( + SeaTunnelRowType rowType, boolean ignoreParseErrors, boolean debeziumEnabledSchema) { + this.rowType = rowType; + this.ignoreParseErrors = ignoreParseErrors; + this.jsonDeserializer = + new JsonDeserializationSchema(false, ignoreParseErrors, createJsonRowType(rowType)); + this.debeziumEnabledSchema = debeziumEnabledSchema; + } + + @Override + public SeaTunnelRow deserialize(byte[] message) throws IOException { + throw new UnsupportedOperationException( + "Please invoke DeserializationSchema#deserialize(byte[], Collector) instead."); + } + + @Override + public void deserialize(byte[] message, Collector out) throws IOException { + if (message == null || message.length == 0) { + // skip tombstone messages + return; + } + + try { + JsonNode payload = getPayload(convertBytes(message)); + String op = payload.get("op").asText(); + + if (OP_CREATE.equals(op) || OP_READ.equals(op)) { + SeaTunnelRow insert = convertJsonNode(payload.get("after")); + insert.setRowKind(RowKind.INSERT); + out.collect(insert); + } else if (OP_UPDATE.equals(op)) { + SeaTunnelRow before = convertJsonNode(payload.get("before")); + if (before == null) { + throw new SeaTunnelJsonFormatException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE")); + } + before.setRowKind(RowKind.UPDATE_BEFORE); + out.collect(before); + + SeaTunnelRow after = convertJsonNode(payload.get("after")); + after.setRowKind(RowKind.UPDATE_AFTER); + out.collect(after); + } else if (OP_DELETE.equals(op)) { + SeaTunnelRow delete = convertJsonNode(payload.get("before")); + if (delete == null) { + throw new SeaTunnelJsonFormatException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE")); + } + delete.setRowKind(RowKind.DELETE); + out.collect(delete); + } else { + if (!ignoreParseErrors) { + throw new SeaTunnelJsonFormatException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format( + "Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'", + op, new String(message))); + } + } + } catch (Throwable t) { + // a big try catch to protect the processing. + if (!ignoreParseErrors) { + throw new SeaTunnelJsonFormatException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format("Corrupt Debezium JSON message '%s'.", new String(message)), + t); + } + } + } + + private JsonNode getPayload(JsonNode jsonNode) { + if (debeziumEnabledSchema) { + return jsonNode.get("payload"); + } + return jsonNode; + } + + private JsonNode convertBytes(byte[] message) { + try { + return jsonDeserializer.deserializeToJsonNode(message); + } catch (Exception t) { + if (ignoreParseErrors) { + return null; + } + throw new SeaTunnelJsonFormatException( + CommonErrorCode.JSON_OPERATION_FAILED, + String.format("Failed to deserialize JSON '%s'.", new String(message)), + t); + } + } + + private SeaTunnelRow convertJsonNode(JsonNode root) { + return jsonDeserializer.convertToRowData(root); + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.rowType; + } + + private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType databaseSchema) { + return databaseSchema; + } +} diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatFactory.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatFactory.java new file mode 100644 index 00000000000..e59c9794fb6 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.json.debezium; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.serialization.SerializationSchema; +import org.apache.seatunnel.api.table.connector.DeserializationFormat; +import org.apache.seatunnel.api.table.connector.SerializationFormat; +import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory; +import org.apache.seatunnel.api.table.factory.SerializationFormatFactory; +import org.apache.seatunnel.api.table.factory.TableFactoryContext; + +import java.util.Map; + +public class DebeziumJsonFormatFactory + implements DeserializationFormatFactory, SerializationFormatFactory { + + public static final String IDENTIFIER = "debezium_json"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().build(); + } + + @Override + public SerializationFormat createSerializationFormat(TableFactoryContext context) { + return new SerializationFormat() { + @Override + public SerializationSchema createSerializationSchema() { + return new DebeziumJsonSerializationSchema(null); + } + }; + } + + @Override + public DeserializationFormat createDeserializationFormat(TableFactoryContext context) { + Map options = context.getOptions().toMap(); + boolean ignoreParseErrors = DebeziumJsonFormatOptions.getIgnoreParseErrors(options); + boolean schemaInclude = DebeziumJsonFormatOptions.getSchemaInclude(options); + + // TODO config SeaTunnelRowType + return new DeserializationFormat() { + @Override + public DeserializationSchema createDeserializationSchema() { + return new DebeziumJsonDeserializationSchema(null, ignoreParseErrors); + } + }; + } +} diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java new file mode 100644 index 00000000000..eb75bfd2b0c --- /dev/null +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.json.debezium; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.format.json.JsonFormatOptions; + +import java.util.Map; + +public class DebeziumJsonFormatOptions { + + public static final int GENERATE_ROW_SIZE = 3; + + public static final Option IGNORE_PARSE_ERRORS = JsonFormatOptions.IGNORE_PARSE_ERRORS; + + public static final Option SCHEMA_INCLUDE = + Options.key("schema-include") + .booleanType() + .defaultValue(false) + .withDescription( + "When setting up a Debezium Kafka Connect, users can enable " + + "a Kafka configuration 'value.converter.schemas.enable' to include schema in the message. " + + "This option indicates the Debezium JSON data include the schema in the message or not. " + + "Default is false."); + + public static boolean getSchemaInclude(Map options) { + return Boolean.parseBoolean( + options.getOrDefault( + SCHEMA_INCLUDE.key(), SCHEMA_INCLUDE.defaultValue().toString())); + } + + public static boolean getIgnoreParseErrors(Map options) { + return Boolean.parseBoolean( + options.getOrDefault( + IGNORE_PARSE_ERRORS.key(), IGNORE_PARSE_ERRORS.defaultValue().toString())); + } +} diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java new file mode 100644 index 00000000000..5b1e476abcd --- /dev/null +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.json.debezium; + +import org.apache.seatunnel.api.serialization.SerializationSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.format.json.JsonSerializationSchema; +import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; + +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; +import static org.apache.seatunnel.format.json.debezium.DebeziumJsonFormatOptions.GENERATE_ROW_SIZE; + +public class DebeziumJsonSerializationSchema implements SerializationSchema { + private static final long serialVersionUID = 1L; + + private static final String OP_INSERT = "c"; // insert + private static final String OP_DELETE = "d"; // delete + + private final JsonSerializationSchema jsonSerializer; + + private transient SeaTunnelRow genericRow; + + public DebeziumJsonSerializationSchema(SeaTunnelRowType rowType) { + this.jsonSerializer = new JsonSerializationSchema(createJsonRowType(rowType)); + this.genericRow = new SeaTunnelRow(GENERATE_ROW_SIZE); + } + + @Override + public byte[] serialize(SeaTunnelRow row) { + try { + switch (row.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + genericRow.setField(0, null); + genericRow.setField(1, row); + genericRow.setField(2, OP_INSERT); + return jsonSerializer.serialize(genericRow); + case UPDATE_BEFORE: + case DELETE: + genericRow.setField(0, row); + genericRow.setField(1, null); + genericRow.setField(2, OP_DELETE); + return jsonSerializer.serialize(genericRow); + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported operation '%s' for row kind.", row.getRowKind())); + } + } catch (Throwable t) { + throw new SeaTunnelJsonFormatException( + CommonErrorCode.JSON_OPERATION_FAILED, + String.format("Could not serialize row %s.", row), + t); + } + } + + private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType databaseSchema) { + return new SeaTunnelRowType( + new String[] {"before", "after", "op"}, + new SeaTunnelDataType[] {databaseSchema, databaseSchema, STRING_TYPE}); + } +} diff --git a/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory b/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory index db11c51c4a2..cedeba7515d 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory +++ b/seatunnel-formats/seatunnel-format-json/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory @@ -17,3 +17,4 @@ org.apache.seatunnel.format.json.JsonFormatFactory org.apache.seatunnel.format.json.canal.CanalJsonFormatFactory +org.apache.seatunnel.format.json.debezium.DebeziumJsonFormatFactory diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java new file mode 100644 index 00000000000..20088e525bf --- /dev/null +++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.json.debezium; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class DebeziumJsonSerDeSchemaTest { + + private static final SeaTunnelRowType PHYSICAL_DATA_TYPE = + new SeaTunnelRowType( + new String[] {"id", "name", "description", "weight"}, + new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE, STRING_TYPE, FLOAT_TYPE}); + + @Test + void testNullRowMessages() throws Exception { + DebeziumJsonDeserializationSchema deserializationSchema = + new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, false); + SimpleCollector collector = new SimpleCollector(); + + deserializationSchema.deserialize(null, collector); + deserializationSchema.deserialize(new byte[0], collector); + assertEquals(0, collector.list.size()); + } + + @Test + public void testSerializationAndSchemaExcludeDeserialization() throws Exception { + testSerializationDeserialization("debezium-data.txt", false); + } + + private void testSerializationDeserialization(String resourceFile, boolean schemaInclude) + throws Exception { + List lines = readLines(resourceFile); + DebeziumJsonDeserializationSchema deserializationSchema = + new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, true, schemaInclude); + + SimpleCollector collector = new SimpleCollector(); + + for (String line : lines) { + deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector); + } + + List expected = + Arrays.asList( + "SeaTunnelRow{tableId=, kind=+I, fields=[101, scooter, Small 2-wheel scooter, 3.14]}", + "SeaTunnelRow{tableId=, kind=+I, fields=[102, car battery, 12V car battery, 8.1]}", + "SeaTunnelRow{tableId=, kind=+I, fields=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}", + "SeaTunnelRow{tableId=, kind=+I, fields=[104, hammer, 12oz carpenter's hammer, 0.75]}", + "SeaTunnelRow{tableId=, kind=+I, fields=[105, hammer, 14oz carpenter's hammer, 0.875]}", + "SeaTunnelRow{tableId=, kind=+I, fields=[106, hammer, 16oz carpenter's hammer, 1.0]}", + "SeaTunnelRow{tableId=, kind=+I, fields=[107, rocks, box of assorted rocks, 5.3]}", + "SeaTunnelRow{tableId=, kind=+I, fields=[108, jacket, water resistent black wind breaker, 0.1]}", + "SeaTunnelRow{tableId=, kind=+I, fields=[109, spare tire, 24 inch spare tire, 22.2]}", + "SeaTunnelRow{tableId=, kind=-U, fields=[106, hammer, 16oz carpenter's hammer, 1.0]}", + "SeaTunnelRow{tableId=, kind=+U, fields=[106, hammer, 18oz carpenter hammer, 1.0]}", + "SeaTunnelRow{tableId=, kind=-U, fields=[107, rocks, box of assorted rocks, 5.3]}", + "SeaTunnelRow{tableId=, kind=+U, fields=[107, rocks, box of assorted rocks, 5.1]}", + "SeaTunnelRow{tableId=, kind=+I, fields=[110, jacket, water resistent white wind breaker, 0.2]}", + "SeaTunnelRow{tableId=, kind=+I, fields=[111, scooter, Big 2-wheel scooter , 5.18]}", + "SeaTunnelRow{tableId=, kind=-U, fields=[110, jacket, water resistent white wind breaker, 0.2]}", + "SeaTunnelRow{tableId=, kind=+U, fields=[110, jacket, new water resistent white wind breaker, 0.5]}", + "SeaTunnelRow{tableId=, kind=-U, fields=[111, scooter, Big 2-wheel scooter , 5.18]}", + "SeaTunnelRow{tableId=, kind=+U, fields=[111, scooter, Big 2-wheel scooter , 5.17]}", + "SeaTunnelRow{tableId=, kind=-D, fields=[111, scooter, Big 2-wheel scooter , 5.17]}"); + List actual = + collector.list.stream().map(Object::toString).collect(Collectors.toList()); + assertEquals(expected, actual); + + DebeziumJsonSerializationSchema serializationSchema = + new DebeziumJsonSerializationSchema(PHYSICAL_DATA_TYPE); + + actual = new ArrayList<>(); + for (SeaTunnelRow rowData : collector.list) { + actual.add(new String(serializationSchema.serialize(rowData), StandardCharsets.UTF_8)); + } + + expected = + Arrays.asList( + "{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"c\"}", + "{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"after\":null,\"op\":\"d\"}", + "{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0},\"op\":\"c\"}", + "{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"after\":null,\"op\":\"d\"}", + "{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op\":\"c\"}", + "{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"after\":null,\"op\":\"d\"}", + "{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water resistent white wind breaker\",\"weight\":0.5},\"op\":\"c\"}", + "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"after\":null,\"op\":\"d\"}", + "{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op\":\"c\"}", + "{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"after\":null,\"op\":\"d\"}"); + assertEquals(expected, actual); + } + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + private static List readLines(String resource) throws IOException { + final URL url = DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource); + Assertions.assertNotNull(url); + Path path = new File(url.getFile()).toPath(); + return Files.readAllLines(path); + } + + private static class SimpleCollector implements Collector { + + private List list = new ArrayList<>(); + + @Override + public void collect(SeaTunnelRow record) { + list.add(record); + } + + @Override + public Object getCheckpointLock() { + return null; + } + } +} diff --git a/seatunnel-formats/seatunnel-format-json/src/test/resources/debezium-data.txt b/seatunnel-formats/seatunnel-format-json/src/test/resources/debezium-data.txt new file mode 100644 index 00000000000..3763369e498 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-json/src/test/resources/debezium-data.txt @@ -0,0 +1,16 @@ +{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null} +{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} +{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} +{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} +{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} +{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} +{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} +{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} +{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} +{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null} +{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null} +{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null} +{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null} +{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null} +{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null} +{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null} \ No newline at end of file