diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java index 55f7e26c09a..922633b83a6 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java @@ -199,6 +199,7 @@ public String toString() { } sb.append("}"); sb.append(", primaryKeys=").append(String.join(";", primaryKeys)); + sb.append(", comment=").append(comment); sb.append(", options=").append(describeOptions()); return sb.toString(); diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java index 1e580d731d1..570efe85d1c 100644 --- a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -133,7 +133,7 @@ void testSingleSplitSingleTable() throws Exception { String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}", @@ -193,8 +193,8 @@ void testSingleSplitMultipleTables() throws Exception { String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}", diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 33372132764..5152651f6a2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -51,6 +51,7 @@ import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECTION_POOL_SIZE; import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_MAX_RETRIES; import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.ENABLE_COLUMN_COMMENTS; import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL; import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; @@ -98,6 +99,7 @@ public DataSource createDataSource(Context context) { StartupOptions startupOptions = getStartupOptions(config); boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED); + Boolean enableColumnComments = config.get(ENABLE_COLUMN_COMMENTS); int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE); int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); @@ -157,7 +159,7 @@ public DataSource createDataSource(Context context) { } configFactory.tableList(capturedTables); - return new MySqlDataSource(configFactory); + return new MySqlDataSource(configFactory, enableColumnComments); } @Override @@ -194,6 +196,7 @@ public Set> optionalOptions() { options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); options.add(HEARTBEAT_INTERVAL); options.add(SCHEMA_CHANGE_ENABLED); + options.add(ENABLE_COLUMN_COMMENTS); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java index 4828e95eaa2..b8aa6e76b8f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java @@ -34,17 +34,21 @@ public class MySqlDataSource implements DataSource { private final MySqlSourceConfigFactory configFactory; private final MySqlSourceConfig sourceConfig; + private final boolean enableColumnComments; - public MySqlDataSource(MySqlSourceConfigFactory configFactory) { + public MySqlDataSource(MySqlSourceConfigFactory configFactory, Boolean enableColumnComments) { this.configFactory = configFactory; this.sourceConfig = configFactory.createConfig(0); + this.enableColumnComments = enableColumnComments; } @Override public EventSourceProvider getEventSourceProvider() { MySqlEventDeserializer deserializer = new MySqlEventDeserializer( - DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges()); + DebeziumChangelogMode.ALL, + sourceConfig.isIncludeSchemaChanges(), + enableColumnComments); MySqlSource source = new MySqlSource<>( @@ -52,7 +56,11 @@ public EventSourceProvider getEventSourceProvider() { deserializer, (sourceReaderMetrics, sourceConfig) -> new MySqlPipelineRecordEmitter( - deserializer, sourceReaderMetrics, sourceConfig)); + deserializer, + sourceReaderMetrics, + sourceConfig, + ((MySqlEventDeserializer) deserializer) + .getEnableColumnComments())); return FlinkSourceProvider.of(source); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 018e033b641..f2db82c7271 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -230,4 +230,12 @@ public class MySqlDataSourceOptions { .defaultValue(true) .withDescription( "Whether send schema change events, by default is true. If set to false, the schema changes will not be sent."); + + @Experimental + public static final ConfigOption ENABLE_COLUMN_COMMENTS = + ConfigOptions.key("column-comments.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether enable column comments, by default is false, if set to true, the column comment will be sent."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java index ec80d14fe2b..3a3dc605cf3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -57,21 +57,25 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final boolean includeSchemaChanges; + private final Boolean enableColumnComments; private transient Tables tables; private transient CustomMySqlAntlrDdlParser customParser; public MySqlEventDeserializer( - DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) { + DebeziumChangelogMode changelogMode, + boolean includeSchemaChanges, + Boolean enableColumnComments) { super(new MySqlSchemaDataTypeInference(), changelogMode); this.includeSchemaChanges = includeSchemaChanges; + this.enableColumnComments = enableColumnComments; } @Override protected List deserializeSchemaChangeRecord(SourceRecord record) { if (includeSchemaChanges) { if (customParser == null) { - customParser = new CustomMySqlAntlrDdlParser(); + customParser = new CustomMySqlAntlrDdlParser(enableColumnComments); tables = new Tables(); } @@ -119,6 +123,10 @@ protected Map getMetadata(SourceRecord record) { return Collections.emptyMap(); } + protected boolean getEnableColumnComments() { + return this.enableColumnComments; + } + @Override protected Object convertToString(Object dbzObj, Schema schema) { // the Geometry datatype in MySQL will be converted to diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java index ed3b430041d..8bdda17dfea 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java @@ -21,6 +21,7 @@ import io.debezium.antlr.DataTypeResolver; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.relational.Tables; import java.sql.Types; import java.util.ArrayList; @@ -33,8 +34,9 @@ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser { private final LinkedList parsedEvents; - public CustomMySqlAntlrDdlParser() { - super(); + public CustomMySqlAntlrDdlParser(boolean enableColumnComments) { + super(true, false, enableColumnComments, null, Tables.TableFilter.includeAll()); + this.parsedEvents = new LinkedList<>(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index 4c099efc7e5..4bbc094e9de 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -61,6 +61,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { private final MySqlSourceConfig sourceConfig; private MySqlAntlrDdlParser mySqlAntlrDdlParser; + private final Boolean enableColumnComments; // Used when startup mode is initial private Set alreadySendCreateTableTables; @@ -72,7 +73,8 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { public MySqlPipelineRecordEmitter( DebeziumDeserializationSchema debeziumDeserializationSchema, MySqlSourceReaderMetrics sourceReaderMetrics, - MySqlSourceConfig sourceConfig) { + MySqlSourceConfig sourceConfig, + Boolean enableColumnComments) { super( debeziumDeserializationSchema, sourceReaderMetrics, @@ -80,6 +82,7 @@ public MySqlPipelineRecordEmitter( this.sourceConfig = sourceConfig; this.alreadySendCreateTableTables = new HashSet<>(); this.createTableEventCache = new ArrayList<>(); + this.enableColumnComments = enableColumnComments; if (!sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { @@ -206,6 +209,7 @@ private Schema parseDDL(String ddlStatement, TableId tableId) { } tableBuilder.physicalColumn(colName, dataType, column.comment()); } + tableBuilder.comment(table.comment()); List primaryKey = table.primaryKeyColumnNames(); if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) { @@ -224,7 +228,13 @@ private synchronized Table parseDdl(String ddlStatement, TableId tableId) { private synchronized MySqlAntlrDdlParser getParser() { if (mySqlAntlrDdlParser == null) { - mySqlAntlrDdlParser = new MySqlAntlrDdlParser(); + mySqlAntlrDdlParser = + new MySqlAntlrDdlParser( + true, + false, + enableColumnComments, + null, + Tables.TableFilter.includeAll()); } return mySqlAntlrDdlParser; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlFullTypesITCase.java index a74e62b2fa6..9456b706031 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlFullTypesITCase.java @@ -379,7 +379,8 @@ private FlinkSourceProvider getFlinkSourceProvider( .password(database.getPassword()) .serverTimeZone(ZoneId.of("UTC").toString()) .serverId(getServerId(env.getParallelism())); - return (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + return (FlinkSourceProvider) + new MySqlDataSource(configFactory, false).getEventSourceProvider(); } private static final RowType COMMON_TYPES = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 5e6ed2ff39a..1901d351824 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -58,6 +58,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.stream.Stream; import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED; @@ -118,7 +119,8 @@ public void testInitialStartupMode() throws Exception { .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); FlinkSourceProvider sourceProvider = - (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + (FlinkSourceProvider) + new MySqlDataSource(configFactory, false).getEventSourceProvider(); CloseableIterator events = env.fromSource( sourceProvider.getSource(), @@ -246,7 +248,8 @@ public void testParseAlterStatement() throws Exception { .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); FlinkSourceProvider sourceProvider = - (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + (FlinkSourceProvider) + new MySqlDataSource(configFactory, false).getEventSourceProvider(); CloseableIterator events = env.fromSource( sourceProvider.getSource(), @@ -317,6 +320,17 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { .build()); } + private CreateTableEvent getColumnCommentsTable(TableId tableId) { + return new CreateTableEvent( + tableId, + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull(), "id") + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), "name") + .primaryKey(Collections.singletonList("id")) + .comment("table which contain column comments") + .build()); + } + private List getSnapshotExpected(TableId tableId) { RowType rowType = RowType.of( @@ -522,4 +536,63 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st inventoryDatabase.getDatabaseName())); return expected; } + + @Test + public void testColumnCommentsTable() throws Exception { + Properties debeziumProperties = new Properties(); + debeziumProperties.put("include.schema.comments", "true"); + env.setParallelism(1); + inventoryDatabase.createAndInitialize(); + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(inventoryDatabase.getDatabaseName() + "\\.columnCommentsTable") + .startupOptions(StartupOptions.latest()) + .serverId(getServerId(env.getParallelism())) + .debeziumProperties(debeziumProperties) + .serverTimeZone("UTC") + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new MySqlDataSource(configFactory, true).getEventSourceProvider(); + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(5_000); + + TableId tableId = + TableId.tableId(inventoryDatabase.getDatabaseName(), "columnCommentsTable"); + List expected = new ArrayList<>(); + expected.add(getColumnCommentsTable(tableId)); + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + String.format( + "ALTER TABLE `%s`.`columnCommentsTable` ADD COLUMN (`desc` VARCHAR(45) comment 'desc', `cols2` VARCHAR(55));", + inventoryDatabase.getDatabaseName())); + + expected.add( + new AddColumnEvent( + tableId, + Arrays.asList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "desc", DataTypes.VARCHAR(45), "desc")), + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "cols2", DataTypes.VARCHAR(55)))))); + } + List actual = fetchResults(events, expected.size()); + assertThat(actual).isEqualTo(expected); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql index 73c8997d5a6..888a5b7a497 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql @@ -68,3 +68,13 @@ VALUES (default, '2016-01-16', 1001, 1, 102), (default, '16-02-21', 1003, 1, 107); +-- create a table which contain column comments +CREATE TABLE columnCommentsTable( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id', + name varchar(255) NOT NULL COMMENT 'name' +) comment='table which contain column comments' +; + +INSERT INTO columnCommentsTable +VALUES (DEFAULT, 'Sally'), + (DEFAULT, 'George'); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/sink/ValuesDataSinkHelperTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/sink/ValuesDataSinkHelperTest.java index 21493c2913e..6f62225edae 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/sink/ValuesDataSinkHelperTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/sink/ValuesDataSinkHelperTest.java @@ -48,7 +48,7 @@ public void testConvertEventToStr() { List fieldGetters = SchemaUtils.createFieldGetters(schema); Assert.assertEquals( - "CreateTableEvent{tableId=default.default.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "CreateTableEvent{tableId=default.default.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}", ValuesDataSinkHelper.convertEventToStr( new CreateTableEvent(tableId, schema), fieldGetters)); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 1e16a0df546..c766d31cc76 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -199,7 +199,7 @@ public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Except "+I[2000, user_21, Shanghai, 123567891234]" }; // Step 2: wait the snapshot splits finished reading - Thread.sleep(5000L); + Thread.sleep(10000L); List actualRecords = consumeRecords(reader, dataType); assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);