Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[debezium] Improve convertToBinary for RowDataDebeziumDeserializeSchema #2435

Merged
merged 4 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/content/connectors/mysql-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ Flink SQL> SELECT * FROM orders;
<td>Boolean</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。</td>
</tr>
<tr>
<td>debezium.binary.handling.mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>debezium.binary.handling.mode 参数可以设置为以下值:
none:不进行任何处理,直接将二进制数据类型作为字节数组(byte array)传输。
base64:将二进制数据类型转换为 Base64 编码的字符串,然后传输。
hex:将二进制数据类型转换为十六进制编码的字符串,然后传输。
默认值为 none。根据您的需求和数据类型,您可以选择合适的处理模式。如果您的数据库中包含大量二进制数据类型,建议使用 base64 或 hex 模式,以便在传输过程中更容易处理。
</tr>
</tbody>
</table>
</div>
Expand Down Expand Up @@ -709,6 +720,36 @@ $ ./bin/flink run \
* 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
* 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。

### 关于二进制类型数据转换为base64编码数据

```sql
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
binary_data STRING,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'test_tb',
'debezium.binary.handling.mode' = 'base64'
);
```

`binary_data`字段, 在数据库中的类型是VARBINARY(N),我们在有些场景需要将二进制数据转换为base64编码的字符串数据,可以通过添加参数'debezium.binary.handling.mode' = 'base64'来开启这个功能,
添加此参数的情况下,我们就可以在flink sql中将该字段类型映射为`STRING`,从而获取base64编码的字符串数据。

数据类型映射
----------------

Expand Down
41 changes: 41 additions & 0 deletions docs/content/connectors/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ During a snapshot operation, the connector will query each included table to pro
<td>Boolean</td>
<td>Whether to close idle readers at the end of the snapshot phase. The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.</td>
</tr>
<tr>
<td>debezium.binary.handling.mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>debezium.binary.handling.mode can be set to one of the following values:
none: No processing is performed, and the binary data type is transmitted as a byte array (byte array).
base64: The binary data type is converted to a Base64-encoded string and transmitted.
hex: The binary data type is converted to a hexadecimal string and transmitted.
The default value is none. Depending on your requirements and data types, you can choose the appropriate processing mode. If your database contains a large number of binary data types, it is recommended to use base64 or hex mode to make it easier to handle during transmission.</td>
</tr>
</tbody>
</table>
</div>
Expand Down Expand Up @@ -720,6 +731,36 @@ There are two places that need to be taken care of.
* If no update operation is performed on the specified column, the exactly-once semantics is ensured.
* If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness.

### About converting binary type data to base64 encoded data

```sql
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
binary_data STRING,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'test_tb',
'debezium.binary.handling.mode' = 'base64'
);
```

`binary_data` field in the database is of type VARBINARY(N). In some scenarios, we need to convert binary data to base64 encoded string data. This feature can be enabled by adding the parameter 'debezium.binary.handling.mode'='base64',
By adding this parameter, we can map the binary field type to 'STRING' in Flink SQL, thereby obtaining base64 encoded string data.

Data Type Mapping
----------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand Down Expand Up @@ -544,6 +545,10 @@ public Object convert(Object dbzObj, Schema schema) {
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
} else if (dbzObj instanceof String) {
// debezium.binary.handling.mode = base64
String data = (String) dbzObj;
return data.getBytes(StandardCharsets.UTF_8);
} else {
throw new UnsupportedOperationException(
"Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2269,4 +2269,85 @@ private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) thro
Thread.sleep(100);
}
}

@Test
public void testBinaryHandlingModeWithBase64() throws Exception {
if (!incrementalSnapshot) {
return;
}
inventoryDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE varbinary_base64_table ("
+ " id INT,"
+ " order_id STRING,"
gong marked this conversation as resolved.
Show resolved Hide resolved
+ " order_date DATE,"
+ " quantity INT,"
+ " product_id INT,"
+ " purchaser STRING,"
+ " PRIMARY KEY(id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s',"
+ " 'debezium.binary.handling.mode' = 'base64'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
TEST_USER,
TEST_PASSWORD,
inventoryDatabase.getDatabaseName(),
"varbinary_base64_table",
getServerId(),
getSplitSize());
tEnv.executeSql(sourceDDL);

// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM varbinary_base64_table");

// wait for the source startup, we don't have a better way to wait it, use sleep for now
do {
Thread.sleep(5000L);
} while (result.getJobClient().get().getJobStatus().get() != RUNNING);

CloseableIterator<Row> iterator = result.collect();

try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO varbinary_base64_table VALUES "
+ "(6, b'0000010000000100000001000000010000000100000001000000010000000101','2021-03-08', "
+ "30, 500, 'flink');");
statement.execute(
"INSERT INTO varbinary_base64_table VALUES "
+ "(7, b'0000010000000100000001000000010000000100000001000000010000000110','2021-03-08', "
+ "30, 500, 'flink-sql');");
statement.execute("UPDATE varbinary_base64_table SET quantity=50 WHERE id=6;");
statement.execute("DELETE FROM varbinary_base64_table WHERE id= 7;");
}
String[] expected =
new String[] {
// snapshot records
"+I[1, BAQEBAQEBAA=, 2021-03-08, 0, 0, flink]",
"+I[2, BAQEBAQEBAE=, 2021-03-08, 10, 100, flink]",
"+I[3, BAQEBAQEBAI=, 2021-03-08, 20, 200, flink]",
"+I[4, BAQEBAQEBAM=, 2021-03-08, 30, 300, flink]",
"+I[5, BAQEBAQEBAQ=, 2021-03-08, 40, 400, flink]",
// binlog records
"+I[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]",
"+I[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]",
"-U[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]",
"+U[6, BAQEBAQEBAU=, 2021-03-08, 50, 500, flink]",
"-D[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,21 @@ VALUES ('', 0, 'flink'),
('E', 1, 'flink'),
('E', 2, 'flink'),
('e', 4, 'flink'),
('E', 3, 'flink');
('E', 3, 'flink');

CREATE TABLE `varbinary_base64_table`
(
`id` int(11) NOT NULL,
`order_id` varbinary(8) NOT NULL,
`order_date` date NOT NULL,
`quantity` int(11) NOT NULL,
`product_id` int(11) NOT NULL,
`purchaser` varchar(512) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO varbinary_base64_table
VALUES (1, b'0000010000000100000001000000010000000100000001000000010000000000', '2021-03-08', 0, 0, 'flink'),
(2, b'0000010000000100000001000000010000000100000001000000010000000001', '2021-03-08', 10, 100, 'flink'),
(3, b'0000010000000100000001000000010000000100000001000000010000000010', '2021-03-08', 20, 200, 'flink'),
(4, b'0000010000000100000001000000010000000100000001000000010000000011', '2021-03-08', 30, 300, 'flink'),
(5, b'0000010000000100000001000000010000000100000001000000010000000100', '2021-03-08', 40, 400, 'flink');