Skip to content

Commit

Permalink
[Fix] Fix Debezium format cannot parse date/time/timestamp (apache#5887)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored and chaorongzhi committed Aug 21, 2024
1 parent 1869453 commit d81ef82
Show file tree
Hide file tree
Showing 11 changed files with 641 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public TableIdentifier getTableId() {
return tableId;
}

public TablePath getTablePath() {
return tableId.toTablePath();
}

public TableSchema getTableSchema() {
return tableSchema;
}
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,66 @@
###### This config file is a demonstration of streaming processing in seatunnel config
######


// The DDL of mysql table
//create table mysql_cdc.mysql_cdc_e2e_source_table
//(
// id int auto_increment
// primary key,
// f_binary binary(64) null,
// f_blob blob null,
// f_long_varbinary mediumblob null,
// f_longblob longblob null,
// f_tinyblob tinyblob null,
// f_varbinary varbinary(100) null,
// f_smallint smallint null,
// f_smallint_unsigned smallint unsigned null,
// f_mediumint mediumint null,
// f_mediumint_unsigned mediumint unsigned null,
// f_int int null,
// f_int_unsigned int unsigned null,
// f_integer int null,
// f_integer_unsigned int unsigned null,
// f_bigint bigint null,
// f_bigint_unsigned bigint unsigned null,
// f_numeric decimal null,
// f_decimal decimal null,
// f_float float null,
// f_double double null,
// f_double_precision double null,
// f_longtext longtext null,
// f_mediumtext mediumtext null,
// f_text text null,
// f_tinytext tinytext null,
// f_varchar varchar(100) null,
// f_date date null,
// f_datetime datetime null,
// f_timestamp timestamp null,
// f_bit1 bit null,
// f_bit64 bit(64) null,
// f_char char null,
// f_enum enum ('enum1', 'enum2', 'enum3') null,
// f_mediumblob mediumblob null,
// f_long_varchar mediumtext null,
// f_real double null,
// f_time time null,
// f_tinyint tinyint null,
// f_tinyint_unsigned tinyint unsigned null,
// f_json json null,
// f_year year null
//);

// The DML of mysql table
// INSERT INTO mysql_cdc.mysql_cdc_e2e_source_table (id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob,
// f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned, f_json, f_year) VALUES (1, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, null, 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12,
// 'This is a long text field', 'This is a medium text field', 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40', true, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 12.345, '14:30:00', -128, 255, '{"key": "value"}', 2022);
// INSERT INTO mysql_cdc.mysql_cdc_e2e_source_table (id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob,
// f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned, f_json, f_year) VALUES (2, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, null, 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12,
// 'This is a long text field', 'This is a medium text field', 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40', true, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 112.345, '14:30:00', -128, 22, '{"key": "value"}', 2013);
// INSERT INTO mysql_cdc.mysql_cdc_e2e_source_table (id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob,
// f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned, f_json, f_year) VALUES (3, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, null, 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12,
// 'This is a long text field', 'This is a medium text field', 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40', true, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 112.345, '14:30:00', -128, 22, '{"key": "value"}', 2021);

env {
execution.parallelism = 1
job.mode = "BATCH"
Expand All @@ -38,10 +98,48 @@ source {
format = debezium_json
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "float"
id = "int"
f_binary = "bytes"
f_blob = "bytes"
f_long_varbinary = "bytes"
f_longblob = "bytes"
f_tinyblob = "bytes"
f_varbinary = "string"
f_smallint = "smallint"
f_smallint_unsigned = "int"
f_mediumint = "int"
f_mediumint_unsigned = "int"
f_int = "int"
f_int_unsigned = "bigint"
f_integer = "int"
f_integer_unsigned = "bigint"
f_bigint = "bigint"
f_bigint_unsigned = "decimal(10, 0)"
f_numeric = "decimal(10, 0)"
f_decimal = "decimal(10, 0)"
f_float = "float"
f_double = "double"
f_double_precision = "double"
f_longtext = "string"
f_mediumtext = "string"
f_text = "string"
f_tinytext = "string"
f_varchar = "string"
f_date = "date"
f_datetime = "timestamp"
f_timestamp = "timestamp"
f_bit1 = "boolean"
f_bit64 = "tinyint"
f_char = "string"
f_enum = "string"
f_mediumblob = "bytes"
f_long_varchar = "string"
f_real = "double"
f_time = "time"
f_tinyint = "tinyint"
f_tinyint_unsigned = "int"
f_json = "string"
f_year = "int"
}
}
}
Expand All @@ -55,7 +153,7 @@ sink {
password = test
generate_sink_sql = true
database = test
table = public.sink
table = public.sink2
primary_keys = ["id"]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,47 @@ source {
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "float"
f_binary = "bytes"
f_blob = "bytes"
f_long_varbinary = "bytes"
f_longblob = "bytes"
f_tinyblob = "bytes"
f_varbinary = "string"
f_smallint = "smallint"
f_smallint_unsigned = "int"
f_mediumint = "int"
f_mediumint_unsigned = "int"
f_int = "int"
f_int_unsigned = "bigint"
f_integer = "int"
f_integer_unsigned = "bigint"
f_bigint = "bigint"
f_bigint_unsigned = "decimal(10, 0)"
f_numeric = "decimal(10, 0)"
f_decimal = "decimal(10, 0)"
f_float = "float"
f_double = "double"
f_double_precision = "double"
f_longtext = "string"
f_mediumtext = "string"
f_text = "string"
f_tinytext = "string"
f_varchar = "string"
f_date = "date"
f_datetime = "timestamp"
f_timestamp = "timestamp"
f_bit1 = "boolean"
f_bit64 = "tinyint"
f_char = "string"
f_enum = "string"
f_mediumblob = "bytes"
f_long_varchar = "string"
f_real = "double"
f_time = "time"
f_tinyint = "tinyint"
f_tinyint_unsigned = "int"
f_json = "string"
f_year = "int"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ public SeaTunnelRow deserialize(SourceRecord record)
String key = debeziumJsonConverter.serializeKey(record);
String value = debeziumJsonConverter.serializeValue(record);
Object[] fields = new Object[] {record.topic(), key, value};
SeaTunnelRow row = new SeaTunnelRow(fields);
return row;
return new SeaTunnelRow(fields);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

@RequiredArgsConstructor
public class CompatibleDebeziumJsonSerializationSchema implements SerializationSchema {
public static final String IDENTIFIER = CompatibleDebeziumJsonDeserializationSchema.IDENTIFIER;

private final int index;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public class DebeziumJsonConverter implements Serializable {

private final boolean keySchemaEnable;
private final boolean valueSchemaEnable;
private transient JsonConverter keyConverter;
private transient JsonConverter valueConverter;
private transient volatile JsonConverter keyConverter;
private transient volatile JsonConverter valueConverter;
private transient Method keyConverterMethod;
private transient Method valueConverterMethod;

Expand Down
2 changes: 0 additions & 2 deletions seatunnel-formats/seatunnel-format-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
<artifactId>seatunnel-format-json</artifactId>
<name>SeaTunnel : Formats : Json</name>

<properties />

<dependencies>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class DebeziumJsonDeserializationSchema implements DeserializationSchema<

private final JsonDeserializationSchema jsonDeserializer;

private final DebeziumRowConverter debeziumRowConverter;

private final boolean ignoreParseErrors;

private final boolean debeziumEnabledSchema;
Expand All @@ -60,6 +62,7 @@ public DebeziumJsonDeserializationSchema(SeaTunnelRowType rowType, boolean ignor
this.ignoreParseErrors = ignoreParseErrors;
this.jsonDeserializer =
new JsonDeserializationSchema(false, ignoreParseErrors, createJsonRowType(rowType));
this.debeziumRowConverter = new DebeziumRowConverter(rowType);
this.debeziumEnabledSchema = false;
}

Expand All @@ -69,6 +72,7 @@ public DebeziumJsonDeserializationSchema(
this.ignoreParseErrors = ignoreParseErrors;
this.jsonDeserializer =
new JsonDeserializationSchema(false, ignoreParseErrors, createJsonRowType(rowType));
this.debeziumRowConverter = new DebeziumRowConverter(rowType);
this.debeziumEnabledSchema = debeziumEnabledSchema;
}

Expand Down Expand Up @@ -140,7 +144,7 @@ private JsonNode convertBytes(byte[] message) {
}

private SeaTunnelRow convertJsonNode(JsonNode root) {
return jsonDeserializer.convertToRowData(root);
return debeziumRowConverter.serializeValue(root);
}

@Override
Expand Down
Loading

0 comments on commit d81ef82

Please sign in to comment.