Skip to content

Commit edc048f

Browse files
junmuzymuzammil
authored andcommitted
Reducing the changes
1 parent 8f378d3 commit edc048f

File tree

2 files changed

+7
-12
lines changed

2 files changed

+7
-12
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,9 @@ public EventSourceProvider getEventSourceProvider() {
6767
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
6868
false);
6969

70-
DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL;
71-
7270
MySqlEventDeserializer deserializer =
7371
new MySqlEventDeserializer(
74-
changelogMode,
72+
DebeziumChangelogMode.ALL,
7573
sourceConfig.isIncludeSchemaChanges(),
7674
readableMetadataList,
7775
includeComments,

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.flink.cdc.connectors.mysql.schema.MySqlTableDefinition;
2727
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
2828
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
29-
import org.apache.flink.cdc.connectors.mysql.source.parser.CustomMySqlAntlrDdlParser;
3029
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
3130
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
3231
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
@@ -37,6 +36,7 @@
3736
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
3837
import org.apache.flink.connector.base.source.reader.RecordEmitter;
3938

39+
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
4040
import io.debezium.jdbc.JdbcConnection;
4141
import io.debezium.relational.Column;
4242
import io.debezium.relational.RelationalDatabaseConnectorConfig;
@@ -72,7 +72,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
7272
private static final Logger LOG = LoggerFactory.getLogger(MySqlPipelineRecordEmitter.class);
7373

7474
private final MySqlSourceConfig sourceConfig;
75-
private CustomMySqlAntlrDdlParser mySqlAntlrDdlParser;
75+
private MySqlAntlrDdlParser mySqlAntlrDdlParser;
7676

7777
// Used when startup mode is initial
7878
private Set<TableId> alreadySendCreateTableTables;
@@ -283,14 +283,14 @@ private Schema buildSchemaFromTable(Table table) {
283283
}
284284

285285
private synchronized Table parseDdl(String ddlStatement, TableId tableId) {
286-
CustomMySqlAntlrDdlParser mySqlAntlrDdlParser = getParser();
286+
MySqlAntlrDdlParser mySqlAntlrDdlParser = getParser();
287287
mySqlAntlrDdlParser.setCurrentDatabase(tableId.catalog());
288288
Tables tables = new Tables();
289289
mySqlAntlrDdlParser.parse(ddlStatement, tables);
290290
return tables.forTable(tableId);
291291
}
292292

293-
private synchronized CustomMySqlAntlrDdlParser getParser() {
293+
private synchronized MySqlAntlrDdlParser getParser() {
294294
if (mySqlAntlrDdlParser == null) {
295295
boolean includeComments =
296296
sourceConfig
@@ -301,11 +301,8 @@ private synchronized CustomMySqlAntlrDdlParser getParser() {
301301
false);
302302
boolean appendOnly = sourceConfig.isScanReadChangelogAsAppendOnly();
303303
mySqlAntlrDdlParser =
304-
new CustomMySqlAntlrDdlParser(
305-
includeComments,
306-
sourceConfig.isTreatTinyInt1AsBoolean(),
307-
false, // isTableIdCaseInsensitive - using false as default
308-
appendOnly);
304+
new MySqlAntlrDdlParser(
305+
true, false, includeComments, null, Tables.TableFilter.includeAll());
309306
}
310307
return mySqlAntlrDdlParser;
311308
}

0 commit comments

Comments
 (0)