Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public String toString() {
}
sb.append("}");
sb.append(", primaryKeys=").append(String.join(";", primaryKeys));
sb.append(", comment=").append(comment);
sb.append(", options=").append(describeOptions());

return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ void testSingleSplitSingleTable() throws Exception {
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
Expand Down Expand Up @@ -193,8 +193,8 @@ void testSingleSplitMultipleTables() throws Exception {
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.ENABLE_COLUMN_COMMENTS;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
Expand Down Expand Up @@ -98,6 +99,7 @@ public DataSource createDataSource(Context context) {
StartupOptions startupOptions = getStartupOptions(config);

boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);
Boolean enableColumnComments = config.get(ENABLE_COLUMN_COMMENTS);

int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
Expand Down Expand Up @@ -157,7 +159,7 @@ public DataSource createDataSource(Context context) {
}
configFactory.tableList(capturedTables);

return new MySqlDataSource(configFactory);
return new MySqlDataSource(configFactory, enableColumnComments);
}

@Override
Expand Down Expand Up @@ -194,6 +196,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(HEARTBEAT_INTERVAL);
options.add(SCHEMA_CHANGE_ENABLED);
options.add(ENABLE_COLUMN_COMMENTS);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,33 @@ public class MySqlDataSource implements DataSource {

private final MySqlSourceConfigFactory configFactory;
private final MySqlSourceConfig sourceConfig;
private final boolean enableColumnComments;

public MySqlDataSource(MySqlSourceConfigFactory configFactory) {
public MySqlDataSource(MySqlSourceConfigFactory configFactory, Boolean enableColumnComments) {
this.configFactory = configFactory;
this.sourceConfig = configFactory.createConfig(0);
this.enableColumnComments = enableColumnComments;
}

@Override
public EventSourceProvider getEventSourceProvider() {
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges());
DebeziumChangelogMode.ALL,
sourceConfig.isIncludeSchemaChanges(),
enableColumnComments);

MySqlSource<Event> source =
new MySqlSource<>(
configFactory,
deserializer,
(sourceReaderMetrics, sourceConfig) ->
new MySqlPipelineRecordEmitter(
deserializer, sourceReaderMetrics, sourceConfig));
deserializer,
sourceReaderMetrics,
sourceConfig,
((MySqlEventDeserializer) deserializer)
.getEnableColumnComments()));

return FlinkSourceProvider.of(source);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,12 @@ public class MySqlDataSourceOptions {
.defaultValue(true)
.withDescription(
"Whether send schema change events, by default is true. If set to false, the schema changes will not be sent.");

@Experimental
public static final ConfigOption<Boolean> ENABLE_COLUMN_COMMENTS =
ConfigOptions.key("column-comments.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether enable column comments, by default is false, if set to true, the column comment will be sent.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,25 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private final boolean includeSchemaChanges;
private final Boolean enableColumnComments;

private transient Tables tables;
private transient CustomMySqlAntlrDdlParser customParser;

public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
DebeziumChangelogMode changelogMode,
boolean includeSchemaChanges,
Boolean enableColumnComments) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
this.enableColumnComments = enableColumnComments;
}

@Override
protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record) {
if (includeSchemaChanges) {
if (customParser == null) {
customParser = new CustomMySqlAntlrDdlParser();
customParser = new CustomMySqlAntlrDdlParser(enableColumnComments);
tables = new Tables();
}

Expand Down Expand Up @@ -119,6 +123,10 @@ protected Map<String, String> getMetadata(SourceRecord record) {
return Collections.emptyMap();
}

protected boolean getEnableColumnComments() {
return this.enableColumnComments;
}

@Override
protected Object convertToString(Object dbzObj, Schema schema) {
// the Geometry datatype in MySQL will be converted to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.debezium.antlr.DataTypeResolver;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.relational.Tables;

import java.sql.Types;
import java.util.ArrayList;
Expand All @@ -33,8 +34,9 @@ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {

private final LinkedList<SchemaChangeEvent> parsedEvents;

public CustomMySqlAntlrDdlParser() {
super();
public CustomMySqlAntlrDdlParser(boolean enableColumnComments) {
super(true, false, enableColumnComments, null, Tables.TableFilter.includeAll());

this.parsedEvents = new LinkedList<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {

private final MySqlSourceConfig sourceConfig;
private MySqlAntlrDdlParser mySqlAntlrDdlParser;
private final Boolean enableColumnComments;

// Used when startup mode is initial
private Set<TableId> alreadySendCreateTableTables;
Expand All @@ -72,14 +73,16 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
public MySqlPipelineRecordEmitter(
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics,
MySqlSourceConfig sourceConfig) {
MySqlSourceConfig sourceConfig,
Boolean enableColumnComments) {
super(
debeziumDeserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges());
this.sourceConfig = sourceConfig;
this.alreadySendCreateTableTables = new HashSet<>();
this.createTableEventCache = new ArrayList<>();
this.enableColumnComments = enableColumnComments;

if (!sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
Expand Down Expand Up @@ -206,6 +209,7 @@ private Schema parseDDL(String ddlStatement, TableId tableId) {
}
tableBuilder.physicalColumn(colName, dataType, column.comment());
}
tableBuilder.comment(table.comment());

List<String> primaryKey = table.primaryKeyColumnNames();
if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
Expand All @@ -224,7 +228,13 @@ private synchronized Table parseDdl(String ddlStatement, TableId tableId) {

private synchronized MySqlAntlrDdlParser getParser() {
if (mySqlAntlrDdlParser == null) {
mySqlAntlrDdlParser = new MySqlAntlrDdlParser();
mySqlAntlrDdlParser =
new MySqlAntlrDdlParser(
true,
false,
enableColumnComments,
null,
Tables.TableFilter.includeAll());
}
return mySqlAntlrDdlParser;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ private FlinkSourceProvider getFlinkSourceProvider(
.password(database.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
.serverId(getServerId(env.getParallelism()));
return (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
return (FlinkSourceProvider)
new MySqlDataSource(configFactory, false).getEventSourceProvider();
}

private static final RowType COMMON_TYPES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Stream;

import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
Expand Down Expand Up @@ -118,7 +119,8 @@ public void testInitialStartupMode() throws Exception {
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
(FlinkSourceProvider)
new MySqlDataSource(configFactory, false).getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
Expand Down Expand Up @@ -246,7 +248,8 @@ public void testParseAlterStatement() throws Exception {
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
(FlinkSourceProvider)
new MySqlDataSource(configFactory, false).getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
Expand Down Expand Up @@ -317,6 +320,17 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
.build());
}

private CreateTableEvent getColumnCommentsTable(TableId tableId) {
return new CreateTableEvent(
tableId,
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull(), "id")
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), "name")
.primaryKey(Collections.singletonList("id"))
.comment("table which contain column comments")
.build());
}

private List<Event> getSnapshotExpected(TableId tableId) {
RowType rowType =
RowType.of(
Expand Down Expand Up @@ -522,4 +536,63 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
inventoryDatabase.getDatabaseName()));
return expected;
}

@Test
public void testColumnCommentsTable() throws Exception {
Properties debeziumProperties = new Properties();
debeziumProperties.put("include.schema.comments", "true");
env.setParallelism(1);
inventoryDatabase.createAndInitialize();
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(MYSQL8_CONTAINER.getHost())
.port(MYSQL8_CONTAINER.getDatabasePort())
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(inventoryDatabase.getDatabaseName())
.tableList(inventoryDatabase.getDatabaseName() + "\\.columnCommentsTable")
.startupOptions(StartupOptions.latest())
.serverId(getServerId(env.getParallelism()))
.debeziumProperties(debeziumProperties)
.serverTimeZone("UTC")
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider)
new MySqlDataSource(configFactory, true).getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
Thread.sleep(5_000);

TableId tableId =
TableId.tableId(inventoryDatabase.getDatabaseName(), "columnCommentsTable");
List<Event> expected = new ArrayList<>();
expected.add(getColumnCommentsTable(tableId));
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {

statement.execute(
String.format(
"ALTER TABLE `%s`.`columnCommentsTable` ADD COLUMN (`desc` VARCHAR(45) comment 'desc', `cols2` VARCHAR(55));",
inventoryDatabase.getDatabaseName()));

expected.add(
new AddColumnEvent(
tableId,
Arrays.asList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"desc", DataTypes.VARCHAR(45), "desc")),
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"cols2", DataTypes.VARCHAR(55))))));
}
List<Event> actual = fetchResults(events, expected.size());
assertThat(actual).isEqualTo(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,13 @@ VALUES (default, '2016-01-16', 1001, 1, 102),
(default, '16-02-21', 1003, 1, 107);


-- create a table which contain column comments
CREATE TABLE columnCommentsTable(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
name varchar(255) NOT NULL COMMENT 'name'
) comment='table which contain column comments'
;

INSERT INTO columnCommentsTable
VALUES (DEFAULT, 'Sally'),
(DEFAULT, 'George');
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testConvertEventToStr() {

List<RecordData.FieldGetter> fieldGetters = SchemaUtils.createFieldGetters(schema);
Assert.assertEquals(
"CreateTableEvent{tableId=default.default.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default.default.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
ValuesDataSinkHelper.convertEventToStr(
new CreateTableEvent(tableId, schema), fieldGetters));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Except
"+I[2000, user_21, Shanghai, 123567891234]"
};
// Step 2: wait the snapshot splits finished reading
Thread.sleep(5000L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only modify the necessary code, is it necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sometimes 5 seconds cann't finish read, so i increment sleep time to 10 second.

Thread.sleep(10000L);
List<String> actualRecords = consumeRecords(reader, dataType);
assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);

Expand Down