From 4080534df3a7010db096249857db109f38b41c87 Mon Sep 17 00:00:00 2001 From: gong Date: Mon, 28 Aug 2023 19:42:51 +0800 Subject: [PATCH 1/4] [debezium] Improve convertToBinary for RowDataDebeziumDeserializeSchema --- .../cdc/debezium/table/RowDataDebeziumDeserializeSchema.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java index 3e6dfa450cb..238b317d8a8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java @@ -47,6 +47,7 @@ import java.io.Serializable; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -544,6 +545,10 @@ public Object convert(Object dbzObj, Schema schema) { byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes); return bytes; + } else if (dbzObj instanceof String) { + // debezium.binary.handling.mode = base64 + String data = (String) dbzObj; + return data.getBytes(StandardCharsets.UTF_8); } else { throw new UnsupportedOperationException( "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName()); From 16445799ff59934f8dc14a64082d38d03e05cee8 Mon Sep 17 00:00:00 2001 From: gong Date: Fri, 1 Dec 2023 17:48:53 +0800 Subject: [PATCH 2/4] Add UT --- .../mysql/table/MySqlConnectorITCase.java | 83 +++++++++++++++++++ .../src/test/resources/ddl/inventory.sql | 19 ++++- 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 3bbbc01f971..658f915ba0c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -2269,4 +2269,87 @@ private static void waitForSnapshotStarted(CloseableIterator iterator) thro Thread.sleep(100); } } + + @Test + public void testBinaryHandlingModeWithBase64() throws Exception { + if (!incrementalSnapshot) { + return; + } + inventoryDatabase.createAndInitialize(); + String sourceDDL = + String.format( + "CREATE TABLE varbinary_base64_table (" + + " id INT," + + " order_id STRING," + + " order_date DATE," + + " quantity INT," + + " product_id INT," + + " purchaser STRING," + + " PRIMARY KEY(id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'server-time-zone' = 'UTC'," + + " 'server-id' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '%s'," + + " 'debezium.binary.handling.mode' = 'base64'" + + ")", + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + TEST_USER, + TEST_PASSWORD, + inventoryDatabase.getDatabaseName(), + "varbinary_base64_table", + getServerId(), + getSplitSize()); + tEnv.executeSql(sourceDDL); + + // async submit job + TableResult result = tEnv.executeSql("SELECT * FROM varbinary_base64_table"); + + // wait for the source startup, we don't have a better way to wait it, use sleep for now + do { + Thread.sleep(5000L); + } while (result.getJobClient().get().getJobStatus().get() != RUNNING); + + CloseableIterator iterator = result.collect(); + + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO varbinary_base64_table VALUES " + + "(6, b'0000010000000100000001000000010000000100000001000000010000000101','2021-03-08', " + + "30, 500, 'flink');"); + statement.execute( + "INSERT INTO varbinary_base64_table VALUES " + + "(7, b'0000010000000100000001000000010000000100000001000000010000000110','2021-03-08', " + + "30, 500, 'flink-sql');"); + statement.execute( + "UPDATE varbinary_base64_table SET quantity=50 WHERE id=6;"); + statement.execute( + "DELETE FROM varbinary_base64_table WHERE id= 7;"); + } + String[] expected = + new String[] { + // snapshot records + "+I[1, BAQEBAQEBAA=, 2021-03-08, 0, 0, flink]", + "+I[2, BAQEBAQEBAE=, 2021-03-08, 10, 100, flink]", + "+I[3, BAQEBAQEBAI=, 2021-03-08, 20, 200, flink]", + "+I[4, BAQEBAQEBAM=, 2021-03-08, 30, 300, flink]", + "+I[5, BAQEBAQEBAQ=, 2021-03-08, 40, 400, flink]", + // binlog records + "+I[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]", + "+I[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]", + "-U[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]", + "+U[6, BAQEBAQEBAU=, 2021-03-08, 50, 500, flink]", + "-D[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]" + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + result.getJobClient().get().cancel().get(); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/inventory.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/inventory.sql index 9fe5cd614ab..ad68ab69546 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/inventory.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/inventory.sql @@ -148,4 +148,21 @@ VALUES ('', 0, 'flink'), ('E', 1, 'flink'), ('E', 2, 'flink'), ('e', 4, 'flink'), - ('E', 3, 'flink'); \ No newline at end of file + ('E', 3, 'flink'); + +CREATE TABLE `varbinary_base64_table` +( + `id` int(11) NOT NULL, + `order_id` varbinary(8) NOT NULL, + `order_date` date NOT NULL, + `quantity` int(11) NOT NULL, + `product_id` int(11) NOT NULL, + `purchaser` varchar(512) NOT NULL, + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; +INSERT INTO varbinary_base64_table +VALUES (1, b'0000010000000100000001000000010000000100000001000000010000000000', '2021-03-08', 0, 0, 'flink'), + (2, b'0000010000000100000001000000010000000100000001000000010000000001', '2021-03-08', 10, 100, 'flink'), + (3, b'0000010000000100000001000000010000000100000001000000010000000010', '2021-03-08', 20, 200, 'flink'), + (4, b'0000010000000100000001000000010000000100000001000000010000000011', '2021-03-08', 30, 300, 'flink'), + (5, b'0000010000000100000001000000010000000100000001000000010000000100', '2021-03-08', 40, 400, 'flink'); \ No newline at end of file From 479d145d582248552fe111a0c778decd86fea69f Mon Sep 17 00:00:00 2001 From: gong Date: Fri, 1 Dec 2023 18:38:13 +0800 Subject: [PATCH 3/4] mvn spotless:apply --- .../mysql/table/MySqlConnectorITCase.java | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 658f915ba0c..c89480fcc48 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -2329,25 +2329,23 @@ public void testBinaryHandlingModeWithBase64() throws Exception { "INSERT INTO varbinary_base64_table VALUES " + "(7, b'0000010000000100000001000000010000000100000001000000010000000110','2021-03-08', " + "30, 500, 'flink-sql');"); - statement.execute( - "UPDATE varbinary_base64_table SET quantity=50 WHERE id=6;"); - statement.execute( - "DELETE FROM varbinary_base64_table WHERE id= 7;"); + statement.execute("UPDATE varbinary_base64_table SET quantity=50 WHERE id=6;"); + statement.execute("DELETE FROM varbinary_base64_table WHERE id= 7;"); } String[] expected = new String[] { - // snapshot records - "+I[1, BAQEBAQEBAA=, 2021-03-08, 0, 0, flink]", - "+I[2, BAQEBAQEBAE=, 2021-03-08, 10, 100, flink]", - "+I[3, BAQEBAQEBAI=, 2021-03-08, 20, 200, flink]", - "+I[4, BAQEBAQEBAM=, 2021-03-08, 30, 300, flink]", - "+I[5, BAQEBAQEBAQ=, 2021-03-08, 40, 400, flink]", - // binlog records - "+I[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]", - "+I[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]", - "-U[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]", - "+U[6, BAQEBAQEBAU=, 2021-03-08, 50, 500, flink]", - "-D[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]" + // snapshot records + "+I[1, BAQEBAQEBAA=, 2021-03-08, 0, 0, flink]", + "+I[2, BAQEBAQEBAE=, 2021-03-08, 10, 100, flink]", + "+I[3, BAQEBAQEBAI=, 2021-03-08, 20, 200, flink]", + "+I[4, BAQEBAQEBAM=, 2021-03-08, 30, 300, flink]", + "+I[5, BAQEBAQEBAQ=, 2021-03-08, 40, 400, flink]", + // binlog records + "+I[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]", + "+I[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]", + "-U[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]", + "+U[6, BAQEBAQEBAU=, 2021-03-08, 50, 500, flink]", + "-D[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]" }; assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); result.getJobClient().get().cancel().get(); From fac87ed76d168b7c5363a8c36607e3149fa2c5b9 Mon Sep 17 00:00:00 2001 From: gong Date: Sat, 2 Dec 2023 14:28:58 +0800 Subject: [PATCH 4/4] Add docs to describe the user case --- docs/content/connectors/mysql-cdc(ZH).md | 41 ++++++++++++++++++++++++ docs/content/connectors/mysql-cdc.md | 41 ++++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/docs/content/connectors/mysql-cdc(ZH).md b/docs/content/connectors/mysql-cdc(ZH).md index d21a3bdf26a..9f45144985b 100644 --- a/docs/content/connectors/mysql-cdc(ZH).md +++ b/docs/content/connectors/mysql-cdc(ZH).md @@ -324,6 +324,17 @@ Flink SQL> SELECT * FROM orders; Boolean 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。 + + debezium.binary.handling.mode + optional + (none) + String + debezium.binary.handling.mode 参数可以设置为以下值: + none:不进行任何处理,直接将二进制数据类型作为字节数组(byte array)传输。 + base64:将二进制数据类型转换为 Base64 编码的字符串,然后传输。 + hex:将二进制数据类型转换为十六进制编码的字符串,然后传输。 + 默认值为 none。根据您的需求和数据类型,您可以选择合适的处理模式。如果您的数据库中包含大量二进制数据类型,建议使用 base64 或 hex 模式,以便在传输过程中更容易处理。 + @@ -709,6 +720,36 @@ $ ./bin/flink run \ * 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。 * 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。 +### 关于二进制类型数据转换为base64编码数据 + +```sql +CREATE TABLE products ( + db_name STRING METADATA FROM 'database_name' VIRTUAL, + table_name STRING METADATA FROM 'table_name' VIRTUAL, + operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + binary_data STRING, + PRIMARY KEY(order_id) NOT ENFORCED +) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'username' = 'root', + 'password' = '123456', + 'database-name' = 'test_db', + 'table-name' = 'test_tb', + 'debezium.binary.handling.mode' = 'base64' +); +``` + +`binary_data`字段, 在数据库中的类型是VARBINARY(N),我们在有些场景需要将二进制数据转换为base64编码的字符串数据,可以通过添加参数'debezium.binary.handling.mode' = 'base64'来开启这个功能, +添加此参数的情况下,我们就可以在flink sql中将该字段类型映射为`STRING`,从而获取base64编码的字符串数据。 + 数据类型映射 ---------------- diff --git a/docs/content/connectors/mysql-cdc.md b/docs/content/connectors/mysql-cdc.md index d93d4bf3cd9..79be00ba2db 100644 --- a/docs/content/connectors/mysql-cdc.md +++ b/docs/content/connectors/mysql-cdc.md @@ -329,6 +329,17 @@ During a snapshot operation, the connector will query each included table to pro Boolean Whether to close idle readers at the end of the snapshot phase. The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true. + + debezium.binary.handling.mode + optional + (none) + String + debezium.binary.handling.mode can be set to one of the following values: + none: No processing is performed, and the binary data type is transmitted as a byte array (byte array). + base64: The binary data type is converted to a Base64-encoded string and transmitted. + hex: The binary data type is converted to a hexadecimal string and transmitted. + The default value is none. Depending on your requirements and data types, you can choose the appropriate processing mode. If your database contains a large number of binary data types, it is recommended to use base64 or hex mode to make it easier to handle during transmission. + @@ -720,6 +731,36 @@ There are two places that need to be taken care of. * If no update operation is performed on the specified column, the exactly-once semantics is ensured. * If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. +### About converting binary type data to base64 encoded data + +```sql +CREATE TABLE products ( + db_name STRING METADATA FROM 'database_name' VIRTUAL, + table_name STRING METADATA FROM 'table_name' VIRTUAL, + operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + binary_data STRING, + PRIMARY KEY(order_id) NOT ENFORCED +) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'username' = 'root', + 'password' = '123456', + 'database-name' = 'test_db', + 'table-name' = 'test_tb', + 'debezium.binary.handling.mode' = 'base64' +); +``` + +`binary_data` field in the database is of type VARBINARY(N). In some scenarios, we need to convert binary data to base64 encoded string data. This feature can be enabled by adding the parameter 'debezium.binary.handling.mode'='base64', +By adding this parameter, we can map the binary field type to 'STRING' in Flink SQL, thereby obtaining base64 encoded string data. + Data Type Mapping ----------------