Skip to content

Commit

Permalink
[FLINK-36877][pipeline-connector-kafka] Fix the output of canal-json …
Browse files Browse the repository at this point in the history
…for Kafka Sink when the record is deleted record.
  • Loading branch information
lvyanquan committed Dec 10, 2024
1 parent c2e3012 commit da0debb
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,16 @@ public byte[] serialize(Event event) {
.getSerializationSchema()
.serialize(reuseGenericRowData);
case DELETE:
reuseGenericRowData.setField(0, null);
reuseGenericRowData.setField(
0,
1,
new GenericArrayData(
new RowData[] {
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(
dataChangeEvent.before(), false)
}));
reuseGenericRowData.setField(1, null);
reuseGenericRowData.setField(2, OP_DELETE);
return jsonSerializers
.get(dataChangeEvent.tableId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testSerialize() throws Exception {
}));
expected =
mapper.readTree(
"{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
Assertions.assertEquals(expected, actual);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ void testCanalJsonFormat() throws Exception {
table1.getTableName())),
mapper.readTree(
String.format(
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
table1.getTableName())),
mapper.readTree(
String.format(
Expand Down Expand Up @@ -449,7 +449,7 @@ void testHashByKeyPartitionStrategyUsingJson() throws Exception {
table1.toString())),
mapper.readTree(
String.format(
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
table1.getTableName()))),
Tuple2.of(
mapper.readTree(
Expand Down

0 comments on commit da0debb

Please sign in to comment.