From da0debb463df5f6927c60fb0aced7bdb540bcc44 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Tue, 10 Dec 2024 20:20:21 +0800 Subject: [PATCH] [FLINK-36877][pipeline-connector-kafka] Fix the output of canal-json for Kafka Sink when the record is deleted record. --- .../kafka/json/canal/CanalJsonSerializationSchema.java | 4 ++-- .../kafka/json/canal/CanalJsonSerializationSchemaTest.java | 2 +- .../flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java index d0c6179752b..a55bcf80248 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java @@ -163,8 +163,9 @@ public byte[] serialize(Event event) { .getSerializationSchema() .serialize(reuseGenericRowData); case DELETE: + reuseGenericRowData.setField(0, null); reuseGenericRowData.setField( - 0, + 1, new GenericArrayData( new RowData[] { jsonSerializers @@ -172,7 +173,6 @@ public byte[] serialize(Event event) { .getRowDataFromRecordData( dataChangeEvent.before(), false) })); - reuseGenericRowData.setField(1, null); reuseGenericRowData.setField(2, OP_DELETE); return jsonSerializers .get(dataChangeEvent.tableId()) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java index 362354c6ecc..13676298237 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java @@ -111,7 +111,7 @@ public void testSerialize() throws Exception { })); expected = mapper.readTree( - "{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); + "{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); actual = mapper.readTree(serializationSchema.serialize(deleteEvent)); Assertions.assertEquals(expected, actual); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java index 7936c0ef3e8..ddce0a9e409 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java @@ -356,7 +356,7 @@ void testCanalJsonFormat() throws Exception { table1.getTableName())), mapper.readTree( String.format( - "{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + "{\"old\":null,\"data\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", table1.getTableName())), mapper.readTree( String.format( @@ -449,7 +449,7 @@ void testHashByKeyPartitionStrategyUsingJson() throws Exception { table1.toString())), mapper.readTree( String.format( - "{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})", + "{\"old\":null,\"data\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})", table1.getTableName()))), Tuple2.of( mapper.readTree(