Skip to content

Commit

Permalink
[Feature][connector][kafka] Support read debezium format message from…
Browse files Browse the repository at this point in the history
… kafka (apache#5066)
  • Loading branch information
sunxiaojian authored Jul 25, 2023
1 parent e964c03 commit 53a1f0c
Show file tree
Hide file tree
Showing 22 changed files with 1,270 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down
107 changes: 107 additions & 0 deletions docs/en/connector-v2/formats/debezium-json.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Debezium Format

Changelog-Data-Capture Format: Serialization Schema Format: Deserialization Schema

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a *change event stream*, and applications simply read these streams to see the change events in the same order in which they occurred.

Seatunnel supports to interpret Debezium JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as

synchronizing incremental data from databases to other systems
auditing logs
real-time materialized views on databases
temporal join changing history of a database table and so on.

Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel asDebezium JSON messages, and emit to storage like Kafka.

# Format Options

| option | default | required | Description |
|-----------------------------------|---------|----------|------------------------------------------------------------------------------------------------------|
| format | (none) | yes | Specify what format to use, here should be 'debezium_json'. |
| debezium-json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |

# How to use Debezium format

## Kafka uses example

Debezium provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table:

```bash
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter ",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter ",
"weight": 5.17
},
"source": {
"version": "1.1.1.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1589362330000,
"snapshot": "false",
"db": "inventory",
"table": "products",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 2090,
"row": 0,
"thread": 2,
"query": null
},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
```

Note: please refer to Debezium documentation about the meaning of each fields.

The MySQL products table has 4 columns (id, name, description and weight).
The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15.
Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel conf to consume this topic and interpret the change events by Debezium format.

```bash
env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "products_binlog"
result_table_name = "kafka_name"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
}
format = debezium_json
}

}

transform {
}

sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "consume-binlog"
format = debezium_json
}
}
```

16 changes: 10 additions & 6 deletions docs/en/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ Kafka distinguishes different transactions by different transactionId. This para

### format

Data format. The default format is json. Optional text format. The default field separator is ",".
If you customize the delimiter, add the "field_delimiter" option.
Data format. The default format is json. Optional text format, canal-json and debezium-json.
If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.
If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.
If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details.

### field_delimiter

Expand Down Expand Up @@ -209,8 +211,10 @@ sink {

### next version

- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/seatunnel/pull/3230)
- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/seatunnel/pull/3711)
- [Improve] Support extract topic from SeaTunnelRow fields [3742](https://github.com/apache/seatunnel/pull/3742)
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719)
- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230)
- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/incubator-seatunnel/pull/3711)
- [Improve] Support extract topic from SeaTunnelRow fields [3742](https://github.com/apache/incubator-seatunnel/pull/3742)
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
- [Improve] Support read canal format message [3950](https://github.com/apache/incubator-seatunnel/pull/3950)
- [Improve] Support read debezium format message [3981](https://github.com/apache/incubator-seatunnel/pull/3981)

17 changes: 10 additions & 7 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ The structure of the data, including field names and field types.

## format

Data format. The default format is json. Optional text format. The default field separator is ", ".
If you customize the delimiter, add the "field_delimiter" option.
Data format. The default format is json. Optional text format, canal-json and debezium-json.
If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.
If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.
If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details.

## format_error_handle_way

Expand Down Expand Up @@ -221,9 +223,10 @@ source {

### Next Version

- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/seatunnel/pull/3157))
- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/seatunnel/pull/3125))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719)
- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/seatunnel/pull/3810))
- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/seatunnel/pull/4364))
- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810))
- [Improve] Support read canal format message [3950](https://github.com/apache/incubator-seatunnel/pull/3950)
- [Improve] Support read debezium format message [3981](https://github.com/apache/incubator-seatunnel/pull/3981)

12 changes: 11 additions & 1 deletion release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,19 @@
## Bug fix

### Core

- [Core] [API] Fixed generic class loss for lists (#4421)
- [Core] [API] Fix parse nested row data type key changed upper (#4459)
- [Starter][Flink]Support transform-v2 for flink #3396
- [Flink] Support flink 1.14.x #3963
### Transformer
- [Spark] Support transform-v2 for spark (#3409)
- [ALL]Add FieldMapper Transform #3781
### Connectors
- [Elasticsearch] Support https protocol & compatible with opensearch
- [Hbase] Add hbase sink connector #4049
### Formats
- [Canal]Support read canal format message #3950
- [Debezium]Support debezium canal format message #3981

### Connector-V2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
public class Config {

public static final String CONNECTOR_IDENTITY = "Kafka";
public static final String REPLICATION_FACTOR = "replication.factor";

/** The default field delimiter is “,” */
public static final String DEFAULT_FIELD_DELIMITER = ",";

Expand Down Expand Up @@ -99,6 +97,12 @@ public class Config {
"Data format. The default format is json. Optional text format. The default field separator is \", \". "
+ "If you customize the delimiter, add the \"field_delimiter\" option.");

public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
Options.key("debezium_record_include_schema")
.booleanType()
.defaultValue(true)
.withDescription("Does the debezium record carry a schema.");

public static final Option<String> FIELD_DELIMITER =
Options.key("field_delimiter")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public enum MessageFormat {
JSON,
TEXT,
CANAL_JSON,
DEBEZIUM_JSON,
COMPATIBLE_DEBEZIUM_JSON
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextSerializationSchema;

Expand Down Expand Up @@ -219,6 +220,8 @@ private static SerializationSchema createSerializationSchema(
.build();
case CANAL_JSON:
return new CanalJsonSerializationSchema(rowType);
case DEBEZIUM_JSON:
return new DebeziumJsonSerializationSchema(rowType);
case COMPATIBLE_DEBEZIUM_JSON:
return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
Expand All @@ -62,6 +63,7 @@
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
Expand Down Expand Up @@ -266,6 +268,14 @@ private void setDeserialization(Config config) {
.setIgnoreParseErrors(true)
.build();
break;
case DEBEZIUM_JSON:
boolean includeSchema = DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
includeSchema = config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
}
deserializationSchema =
new DebeziumJsonDeserializationSchema(typeInfo, true, includeSchema);
break;
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public OptionRule optionRule() {
Config.KAFKA_CONFIG,
Config.SCHEMA,
Config.FORMAT,
Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
.conditional(Config.START_MODE, StartMode.TIMESTAMP, Config.START_MODE_TIMESTAMP)
.conditional(
Expand Down
Loading

0 comments on commit 53a1f0c

Please sign in to comment.