Skip to content

Commit b7632e6

Browse files
committed
add test
1 parent ed149c6 commit b7632e6

File tree

5 files changed

+238
-3
lines changed

5 files changed

+238
-3
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242

4343
import java.time.ZoneId;
4444

45+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_SCHEMA_ENABLED;
46+
4547
/** Tests for {@link CanalJsonSerializationSchema}. */
4648
public class CanalJsonSerializationSchemaTest {
4749

@@ -57,7 +59,8 @@ public void testSerialize() throws Exception {
5759
ChangeLogJsonFormatFactory.createSerializationSchema(
5860
new Configuration(),
5961
JsonSerializationType.CANAL_JSON,
60-
ZoneId.systemDefault());
62+
ZoneId.systemDefault(),
63+
SINK_DEBEZIUM_JSON_SCHEMA_ENABLED.defaultValue());
6164
serializationSchema.open(new MockInitializationContext());
6265

6366
// create table

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package org.apache.flink.cdc.connectors.kafka.json.debezium;
1919

2020
import org.apache.flink.api.common.serialization.SerializationSchema;
21+
import org.apache.flink.cdc.common.data.DecimalData;
22+
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
23+
import org.apache.flink.cdc.common.data.TimestampData;
2124
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
2225
import org.apache.flink.cdc.common.event.CreateTableEvent;
2326
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -40,8 +43,12 @@
4043
import org.junit.jupiter.api.Assertions;
4144
import org.junit.jupiter.api.Test;
4245

46+
import java.math.BigDecimal;
47+
import java.time.Instant;
4348
import java.time.ZoneId;
4449

50+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_SCHEMA_ENABLED;
51+
4552
/** Tests for {@link DebeziumJsonSerializationSchema}. */
4653
public class DebeziumJsonSerializationSchemaTest {
4754

@@ -57,7 +64,8 @@ public void testSerialize() throws Exception {
5764
ChangeLogJsonFormatFactory.createSerializationSchema(
5865
new Configuration(),
5966
JsonSerializationType.DEBEZIUM_JSON,
60-
ZoneId.systemDefault());
67+
ZoneId.systemDefault(),
68+
SINK_DEBEZIUM_JSON_SCHEMA_ENABLED.defaultValue());
6169
serializationSchema.open(new MockInitializationContext());
6270
// create table
6371
Schema schema =
@@ -129,4 +137,112 @@ public void testSerialize() throws Exception {
129137
actual = mapper.readTree(serializationSchema.serialize(updateEvent));
130138
Assertions.assertEquals(expected, actual);
131139
}
140+
141+
@Test
142+
public void testSerializeWithSchemaAllDataTypes() throws Exception {
143+
ObjectMapper mapper =
144+
JacksonMapperFactory.createObjectMapper()
145+
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
146+
SerializationSchema<Event> serializationSchema =
147+
ChangeLogJsonFormatFactory.createSerializationSchema(
148+
new Configuration(),
149+
JsonSerializationType.DEBEZIUM_JSON,
150+
ZoneId.systemDefault(),
151+
true);
152+
serializationSchema.open(new MockInitializationContext());
153+
// create table
154+
Schema schema =
155+
Schema.newBuilder()
156+
.physicalColumn("_boolean", DataTypes.BOOLEAN())
157+
.physicalColumn("_binary", DataTypes.BINARY(3))
158+
.physicalColumn("_varbinary", DataTypes.VARBINARY(10))
159+
.physicalColumn("_bytes", DataTypes.BYTES())
160+
.physicalColumn("_tinyint", DataTypes.TINYINT())
161+
.physicalColumn("_smallint", DataTypes.SMALLINT())
162+
.physicalColumn("_int", DataTypes.INT())
163+
.physicalColumn("_bigint", DataTypes.BIGINT())
164+
.physicalColumn("_float", DataTypes.FLOAT())
165+
.physicalColumn("_double", DataTypes.DOUBLE())
166+
.physicalColumn("_decimal", DataTypes.DECIMAL(6, 3))
167+
.physicalColumn("_char", DataTypes.CHAR(5))
168+
.physicalColumn("_varchar", DataTypes.VARCHAR(10))
169+
.physicalColumn("_string", DataTypes.STRING())
170+
.physicalColumn("_date", DataTypes.DATE())
171+
.physicalColumn("_time", DataTypes.TIME())
172+
.physicalColumn("_time_6", DataTypes.TIME(6))
173+
.physicalColumn("_timestamp", DataTypes.TIMESTAMP())
174+
.physicalColumn("_timestamp_3", DataTypes.TIMESTAMP(3))
175+
.physicalColumn("_timestamp_ltz", DataTypes.TIMESTAMP_LTZ())
176+
.physicalColumn("_timestamp_ltz_3", DataTypes.TIMESTAMP_LTZ(3))
177+
.physicalColumn("pt", DataTypes.STRING())
178+
.primaryKey("pt")
179+
.build();
180+
181+
RowType rowType =
182+
RowType.of(
183+
DataTypes.BOOLEAN(),
184+
DataTypes.BINARY(3),
185+
DataTypes.VARBINARY(10),
186+
DataTypes.BYTES(),
187+
DataTypes.TINYINT(),
188+
DataTypes.SMALLINT(),
189+
DataTypes.INT(),
190+
DataTypes.BIGINT(),
191+
DataTypes.FLOAT(),
192+
DataTypes.DOUBLE(),
193+
DataTypes.DECIMAL(6, 3),
194+
DataTypes.CHAR(5),
195+
DataTypes.VARCHAR(10),
196+
DataTypes.STRING(),
197+
DataTypes.DATE(),
198+
DataTypes.TIME(),
199+
DataTypes.TIME(6),
200+
DataTypes.TIMESTAMP(),
201+
DataTypes.TIMESTAMP(3),
202+
DataTypes.TIMESTAMP_LTZ(),
203+
DataTypes.TIMESTAMP_LTZ(3),
204+
DataTypes.STRING());
205+
206+
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
207+
Assertions.assertNull(serializationSchema.serialize(createTableEvent));
208+
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
209+
// insert
210+
DataChangeEvent insertEvent1 =
211+
DataChangeEvent.insertEvent(
212+
TABLE_1,
213+
generator.generate(
214+
new Object[] {
215+
true,
216+
new byte[] {1, 2},
217+
new byte[] {3, 4},
218+
new byte[] {5, 6, 7},
219+
(byte) 1,
220+
(short) 2,
221+
3,
222+
4L,
223+
5.1f,
224+
6.2,
225+
DecimalData.fromBigDecimal(new BigDecimal("7.123"), 6, 3),
226+
BinaryStringData.fromString("test1"),
227+
BinaryStringData.fromString("test2"),
228+
BinaryStringData.fromString("test3"),
229+
100,
230+
200,
231+
300,
232+
TimestampData.fromTimestamp(
233+
java.sql.Timestamp.valueOf("2023-01-01 00:00:00.000")),
234+
TimestampData.fromTimestamp(
235+
java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
236+
LocalZonedTimestampData.fromInstant(
237+
Instant.parse("2023-01-01T00:00:00.000Z")),
238+
LocalZonedTimestampData.fromInstant(
239+
Instant.parse("2023-01-01T00:00:00.000Z")),
240+
null
241+
}));
242+
JsonNode expected =
243+
mapper.readTree(
244+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"boolean\",\"optional\":true,\"field\":\"_boolean\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"3\"},\"field\":\"_binary\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_varbinary\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_bytes\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"_tinyint\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"_smallint\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"_int\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"_bigint\"},{\"type\":\"double\",\"optional\":true,\"field\":\"_float\"},{\"type\":\"double\",\"optional\":true,\"field\":\"_double\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Decimal\",\"version\":1,\"parameters\":{\"scale\":\"3\",\"connect.decimal.precision\":\"6\"},\"field\":\"_decimal\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_char\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_varchar\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_string\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"_date\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.MicroTime\",\"version\":1,\"field\":\"_time\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.MicroTime\",\"version\":1,\"field\":\"_time_6\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.time.Timestamp\",\"version\":1,\"field\":\"_timestamp\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.time.Timestamp\",\"version\":1,\"field\":\"_timestamp_3\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_timestamp_ltz\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_timestamp_ltz_3\"},{\"type\":\"string\",\"optional\":true,\"field\":\"pt\"}],\"optional\":true,\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"boolean\",\"optional\":true,\"field\":\"_boolean\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"3\"},\"field\":\"_binary\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_varbinary\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_bytes\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"_tinyint\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"_smallint\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"_int\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"_bigint\"},{\"type\":\"double\",\"optional\":true,\"field\":\"_float\"},{\"type\":\"double\",\"optional\":true,\"field\":\"_double\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Decimal\",\"version\":1,\"parameters\":{\"scale\":\"3\",\"connect.decimal.precision\":\"6\"},\"field\":\"_decimal\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_char\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_varchar\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_string\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"_date\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.MicroTime\",\"version\":1,\"field\":\"_time\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.MicroTime\",\"version\":1,\"field\":\"_time_6\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.time.Timestamp\",\"version\":1,\"field\":\"_timestamp\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.time.Timestamp\",\"version\":1,\"field\":\"_timestamp_3\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_timestamp_ltz\"},{\"type\":\"string\",\"optional\":true,\"field\":\"_timestamp_ltz_3\"},{\"type\":\"string\",\"optional\":true,\"field\":\"pt\"}],\"optional\":true,\"field\":\"after\"}],\"optional\":false},\"payload\":{\"before\":null,\"after\":{\"_boolean\":true,\"_binary\":\"AQI=\",\"_varbinary\":\"AwQ=\",\"_bytes\":\"BQYH\",\"_tinyint\":1,\"_smallint\":2,\"_int\":3,\"_bigint\":4,\"_float\":5.1,\"_double\":6.2,\"_decimal\":7.123,\"_char\":\"test1\",\"_varchar\":\"test2\",\"_string\":\"test3\",\"_date\":\"1970-04-11\",\"_time\":\"00:00:00\",\"_time_6\":\"00:00:00\",\"_timestamp\":\"2023-01-01 00:00:00\",\"_timestamp_3\":\"2023-01-01 00:00:00\",\"_timestamp_ltz\":\"2023-01-01 00:00:00Z\",\"_timestamp_ltz_3\":\"2023-01-01 00:00:00Z\",\"pt\":null},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}}");
245+
JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1));
246+
Assertions.assertEquals(expected, actual);
247+
}
132248
}

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,82 @@ public void testSyncWholeDatabaseWithCanalJson() throws Exception {
304304
.containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords));
305305
}
306306

307+
@Test
308+
public void testSyncWholeDatabaseWithDebeziumJsonHasSchema() throws Exception {
309+
String pipelineJob =
310+
String.format(
311+
"source:\n"
312+
+ " type: mysql\n"
313+
+ " hostname: %s\n"
314+
+ " port: 3306\n"
315+
+ " username: %s\n"
316+
+ " password: %s\n"
317+
+ " tables: %s.\\.*\n"
318+
+ " server-id: 5400-5404\n"
319+
+ " server-time-zone: UTC\n"
320+
+ "\n"
321+
+ "sink:\n"
322+
+ " type: kafka\n"
323+
+ " properties.bootstrap.servers: kafka:9092\n"
324+
+ " topic: %s\n"
325+
+ " sink.debezium-json-schema.enabled: true\n"
326+
+ "\n"
327+
+ "pipeline:\n"
328+
+ " parallelism: %d",
329+
INTER_CONTAINER_MYSQL_ALIAS,
330+
MYSQL_TEST_USER,
331+
MYSQL_TEST_PASSWORD,
332+
mysqlInventoryDatabase.getDatabaseName(),
333+
topic,
334+
parallelism);
335+
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
336+
Path kafkaCdcJar = TestUtils.getResource("kafka-cdc-pipeline-connector.jar");
337+
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
338+
submitPipelineJob(pipelineJob, mysqlCdcJar, kafkaCdcJar, mysqlDriverJar);
339+
waitUntilJobRunning(Duration.ofSeconds(30));
340+
LOG.info("Pipeline job is running");
341+
List<ConsumerRecord<byte[], byte[]>> collectedRecords = new ArrayList<>();
342+
int expectedEventCount = 13;
343+
waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
344+
List<String> expectedRecords =
345+
getExpectedRecords("expectedEvents/mysqlToKafka/debezium-json-with-schema.txt");
346+
assertThat(expectedRecords).containsAll(deserializeValues(collectedRecords));
347+
LOG.info("Begin incremental reading stage.");
348+
// generate binlogs
349+
String mysqlJdbcUrl =
350+
String.format(
351+
"jdbc:mysql://%s:%s/%s",
352+
MYSQL.getHost(),
353+
MYSQL.getDatabasePort(),
354+
mysqlInventoryDatabase.getDatabaseName());
355+
try (Connection conn =
356+
DriverManager.getConnection(
357+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
358+
Statement stat = conn.createStatement()) {
359+
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
360+
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
361+
362+
// modify table schema
363+
stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
364+
stat.execute(
365+
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110
366+
stat.execute(
367+
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111
368+
stat.execute(
369+
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
370+
stat.execute("UPDATE products SET weight='5.17' WHERE id=111;");
371+
stat.execute("DELETE FROM products WHERE id=111;");
372+
} catch (SQLException e) {
373+
LOG.error("Update table for CDC failed.", e);
374+
throw e;
375+
}
376+
377+
expectedEventCount = 20;
378+
waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
379+
assertThat(expectedRecords)
380+
.containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords));
381+
}
382+
307383
private void waitUntilSpecificEventCount(
308384
List<ConsumerRecord<byte[], byte[]>> actualEvent, int expectedCount) throws Exception {
309385
boolean result = false;

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_inventory.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,4 @@ INSERT INTO customers
5252
VALUES (101,"user_1","Shanghai","123567891234"),
5353
(102,"user_2","Shanghai","123567891234"),
5454
(103,"user_3","Shanghai","123567891234"),
55-
(104,"user_4","Shanghai","123567891234");
55+
(104,"user_4","Shanghai","123567891234");

0 commit comments

Comments
 (0)