Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36282][pipeline-connector][cdc-connector][mysql]fix incorrect data type of TINYINT(1) in mysql pipeline connector #3608

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ public MySqlDataSource(MySqlSourceConfigFactory configFactory) {
public EventSourceProvider getEventSourceProvider() {
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges());
DebeziumChangelogMode.ALL,
sourceConfig.isIncludeSchemaChanges(),
sourceConfig.getJdbcProperties());
Comment on lines +48 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if we need to pass the whole JdbcProperties into the deserializer, since tinyInt1isBit is the only key that relevant to type deserialization. Maybe passing a boolean tinyInt1isBit is enough?


MySqlSource<Event> source =
new MySqlSource<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord;

Expand All @@ -59,21 +60,25 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private final boolean includeSchemaChanges;
private final Properties jdbcProperties;

private transient Tables tables;
private transient CustomMySqlAntlrDdlParser customParser;

public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
DebeziumChangelogMode changelogMode,
boolean includeSchemaChanges,
Properties jdbcProperties) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
this.jdbcProperties = jdbcProperties;
}

@Override
protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record) {
if (includeSchemaChanges) {
if (customParser == null) {
customParser = new CustomMySqlAntlrDdlParser();
customParser = new CustomMySqlAntlrDdlParser(jdbcProperties);
tables = new Tables();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn;
Expand All @@ -60,6 +61,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
private final MySqlAntlrDdlParser parser;
private final List<ParseTreeListener> listeners;
private final LinkedList<SchemaChangeEvent> changes;
private final Properties jdbcProperties;
private org.apache.flink.cdc.common.event.TableId currentTable;
private List<ColumnEditor> columnEditors;
private CustomColumnDefinitionParserListener columnDefinitionListener;
Expand All @@ -70,10 +72,12 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
public CustomAlterTableParserListener(
MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners,
LinkedList<SchemaChangeEvent> changes) {
LinkedList<SchemaChangeEvent> changes,
Properties jdbcProperties) {
this.parser = parser;
this.listeners = listeners;
this.changes = changes;
this.jdbcProperties = jdbcProperties;
}

@Override
Expand Down Expand Up @@ -315,7 +319,7 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
String newColumnName = parser.parseName(ctx.newColumn);

Map<String, DataType> typeMapping = new HashMap<>();
typeMapping.put(column.name(), fromDbzColumn(column));
typeMapping.put(column.name(), fromDbzColumn(column, jdbcProperties));
changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));

if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) {
Expand Down Expand Up @@ -366,7 +370,7 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
() -> {
Column column = columnDefinitionListener.getColumn();
Map<String, DataType> typeMapping = new HashMap<>();
typeMapping.put(column.name(), fromDbzColumn(column));
typeMapping.put(column.name(), fromDbzColumn(column, jdbcProperties));
changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));
listeners.remove(columnDefinitionListener);
},
Expand Down Expand Up @@ -413,7 +417,7 @@ public void exitDropTable(MySqlParser.DropTableContext ctx) {
private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) {
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
dbzColumn.name(),
fromDbzColumn(dbzColumn),
fromDbzColumn(dbzColumn, jdbcProperties),
dbzColumn.comment(),
dbzColumn.defaultValueExpression().orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;

/** A ddl parser that will use custom listener. */
public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {

private final LinkedList<SchemaChangeEvent> parsedEvents;
private final Properties jdbcProperties;

public CustomMySqlAntlrDdlParser() {
public CustomMySqlAntlrDdlParser(Properties jdbcProperties) {
super();
this.parsedEvents = new LinkedList<>();
this.jdbcProperties = jdbcProperties;
}

// Overriding this method because the BIT type requires default length dimension of 1.
Expand Down Expand Up @@ -277,7 +280,7 @@ protected DataTypeResolver initializeDataTypeResolver() {

@Override
protected AntlrDdlParserListener createParseTreeWalkerListener() {
return new CustomMySqlAntlrDdlParserListener(this, parsedEvents);
return new CustomMySqlAntlrDdlParserListener(this, parsedEvents, jdbcProperties);
}

public List<SchemaChangeEvent> getAndClearParsedEvents() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;

/**
Expand Down Expand Up @@ -74,12 +75,16 @@ public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener
private final Collection<ParsingException> errors = new ArrayList<>();

public CustomMySqlAntlrDdlParserListener(
MySqlAntlrDdlParser parser, LinkedList<SchemaChangeEvent> parsedEvents) {
MySqlAntlrDdlParser parser,
LinkedList<SchemaChangeEvent> parsedEvents,
Properties jdbcProperties) {
// initialize listeners
listeners.add(new CreateAndAlterDatabaseParserListener(parser));
listeners.add(new DropDatabaseParserListener(parser));
listeners.add(new CreateTableParserListener(parser, listeners));
listeners.add(new CustomAlterTableParserListener(parser, listeners, parsedEvents));
listeners.add(
new CustomAlterTableParserListener(
parser, listeners, parsedEvents, jdbcProperties));
listeners.add(new DropTableParserListener(parser));
listeners.add(new RenameTableParserListener(parser));
listeners.add(new TruncateTableParserListener(parser));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ private Schema parseDDL(String ddlStatement, TableId tableId) {
Column column = columns.get(i);

String colName = column.name();
DataType dataType = MySqlTypeUtils.fromDbzColumn(column);
DataType dataType =
MySqlTypeUtils.fromDbzColumn(column, sourceConfig.getJdbcProperties());
if (!column.isOptional()) {
dataType = dataType.notNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
Expand Down Expand Up @@ -129,14 +130,14 @@ public static Schema getTableSchema(
new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) {
TableChanges.TableChange tableSchema =
mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId));
return toSchema(tableSchema.getTable());
return toSchema(tableSchema.getTable(), sourceConfig.getJdbcProperties());
}
}

public static Schema toSchema(Table table) {
public static Schema toSchema(Table table, Properties jdbcProperties) {
List<Column> columns =
table.columns().stream()
.map(MySqlSchemaUtils::toColumn)
.map(column -> toColumn(column, jdbcProperties))
.collect(Collectors.toList());

return Schema.newBuilder()
Expand All @@ -146,9 +147,11 @@ public static Schema toSchema(Table table) {
.build();
}

public static Column toColumn(io.debezium.relational.Column column) {
public static Column toColumn(io.debezium.relational.Column column, Properties jdbcProperties) {
return Column.physicalColumn(
column.name(), MySqlTypeUtils.fromDbzColumn(column), column.comment());
column.name(),
MySqlTypeUtils.fromDbzColumn(column, jdbcProperties),
column.comment());
}

public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;

import com.mysql.cj.conf.PropertyKey;
import io.debezium.relational.Column;

import java.util.Properties;

/** Utilities for converting from MySQL types to {@link DataType}s. */
public class MySqlTypeUtils {

Expand Down Expand Up @@ -109,8 +112,8 @@ public class MySqlTypeUtils {
private static final String UNKNOWN = "UNKNOWN";

/** Returns a corresponding Flink data type from a debezium {@link Column}. */
public static DataType fromDbzColumn(Column column) {
DataType dataType = convertFromColumn(column);
public static DataType fromDbzColumn(Column column, Properties jdbcProperties) {
DataType dataType = convertFromColumn(column, jdbcProperties);
if (column.isOptional()) {
return dataType;
} else {
Expand All @@ -122,7 +125,7 @@ public static DataType fromDbzColumn(Column column) {
* Returns a corresponding Flink data type from a debezium {@link Column} with nullable always
* be true.
*/
private static DataType convertFromColumn(Column column) {
private static DataType convertFromColumn(Column column, Properties jdbcProperties) {
String typeName = column.typeName();
switch (typeName) {
case BIT:
Expand All @@ -137,7 +140,13 @@ private static DataType convertFromColumn(Column column) {
// user should not use tinyint(1) to store number although jdbc url parameter
// tinyInt1isBit=false can help change the return value, it's not a general way
// btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default
return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT();
boolean tinyInt1isBit =
Boolean.parseBoolean(
jdbcProperties.getProperty(
PropertyKey.tinyInt1isBit.getKeyName(), "true"));
return (column.length() == 1 && tinyInt1isBit)
? DataTypes.BOOLEAN()
: DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.mysql.cj.conf.PropertyKey;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -41,6 +42,7 @@
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -102,13 +104,23 @@ public void testMysql8AccessDatabaseAndTable() {
}

@Test
public void testMysql57AccessCommonTypesSchema() {
testAccessCommonTypesSchema(fullTypesMySql57Database);
public void testMysql57AccessCommonTypesSchemaTinyintIsBit() {
testAccessCommonTypesSchema(fullTypesMySql57Database, true);
}

@Test
public void testMysql8AccessCommonTypesSchema() {
testAccessCommonTypesSchema(fullTypesMySql8Database);
public void testMysql57AccessCommonTypesSchemaTinyintIsNotBit() {
testAccessCommonTypesSchema(fullTypesMySql57Database, false);
}

@Test
public void testMysql8AccessCommonTypesSchemaTinyintIsBit() {
testAccessCommonTypesSchema(fullTypesMySql8Database, true);
}

@Test
public void testMysql8AccessCommonTypesSchemaTinyintIsNotBit() {
testAccessCommonTypesSchema(fullTypesMySql8Database, false);
}

@Test
Expand All @@ -117,7 +129,7 @@ public void testMysql57AccessTimeTypesSchema() {

String[] tables = new String[] {"time_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql57Database);
getMetadataAccessor(tables, fullTypesMySql57Database, true);

Schema actualSchema =
metadataAccessor.getTableSchema(
Expand Down Expand Up @@ -163,7 +175,7 @@ public void testMysql8AccessTimeTypesSchema() {

String[] tables = new String[] {"time_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql8Database);
getMetadataAccessor(tables, fullTypesMySql8Database, true);

Schema actualSchema =
metadataAccessor.getTableSchema(
Expand Down Expand Up @@ -211,7 +223,7 @@ private void testAccessDatabaseAndTable(UniqueDatabase database) {
database.createAndInitialize();

String[] tables = new String[] {"common_types", "time_types", "precision_types"};
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database, true);

assertThatThrownBy(metadataAccessor::listNamespaces)
.isInstanceOf(UnsupportedOperationException.class);
Expand All @@ -227,11 +239,12 @@ private void testAccessDatabaseAndTable(UniqueDatabase database) {
assertThat(actualTables).containsExactlyInAnyOrderElementsOf(expectedTables);
}

private void testAccessCommonTypesSchema(UniqueDatabase database) {
private void testAccessCommonTypesSchema(UniqueDatabase database, boolean tinyint1IsBit) {
database.createAndInitialize();

String[] tables = new String[] {"common_types"};
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, database, tinyint1IsBit);

Schema actualSchema =
metadataAccessor.getTableSchema(
Expand Down Expand Up @@ -277,8 +290,12 @@ private void testAccessCommonTypesSchema(UniqueDatabase database) {
DataTypes.STRING(),
DataTypes.BOOLEAN(),
DataTypes.BINARY(1),
DataTypes.BOOLEAN(),
DataTypes.BOOLEAN(),
tinyint1IsBit
? DataTypes.BOOLEAN()
: DataTypes.TINYINT(),
tinyint1IsBit
? DataTypes.BOOLEAN()
: DataTypes.TINYINT(),
DataTypes.BINARY(16),
DataTypes.BINARY(8),
DataTypes.STRING(),
Expand Down Expand Up @@ -357,17 +374,22 @@ private void testAccessCommonTypesSchema(UniqueDatabase database) {
assertThat(actualSchema).isEqualTo(expectedSchema);
}

private MySqlMetadataAccessor getMetadataAccessor(String[] tables, UniqueDatabase database) {
MySqlSourceConfig sourceConfig = getConfig(tables, database);
private MySqlMetadataAccessor getMetadataAccessor(
String[] tables, UniqueDatabase database, boolean tinyint1IsBit) {
MySqlSourceConfig sourceConfig = getConfig(tables, database, tinyint1IsBit);
return new MySqlMetadataAccessor(sourceConfig);
}

private MySqlSourceConfig getConfig(String[] captureTables, UniqueDatabase database) {
private MySqlSourceConfig getConfig(
String[] captureTables, UniqueDatabase database, boolean tinyint1IsBit) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> database.getDatabaseName() + "." + tableName)
.toArray(String[]::new);

Properties jdbcProperties = new Properties();
jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyint1IsBit));

return new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.latest())
.databaseList(database.getDatabaseName())
Expand All @@ -380,6 +402,7 @@ private MySqlSourceConfig getConfig(String[] captureTables, UniqueDatabase datab
.username(database.getUsername())
.password(database.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
.jdbcProperties(jdbcProperties)
.createConfig(0);
}
}
Loading
Loading