Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public static DeserializationRuntimeConverterFactory instance() {
public Optional<DeserializationRuntimeConverter> createUserDefinedConverter(
LogicalType logicalType, ZoneId serverTimeZone) {
switch (logicalType.getTypeRoot()) {
case TINYINT:
return createTinyIntConverter();
case CHAR:
case VARCHAR:
return createStringConverter();
Expand Down Expand Up @@ -148,6 +150,23 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {
}
}

private static Optional<DeserializationRuntimeConverter> createTinyIntConverter() {

return Optional.of(
new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
if (dbzObj instanceof Boolean) {
return dbzObj == Boolean.TRUE ? (byte) 1 : (byte) 0;
} else {
return Byte.parseByte(dbzObj.toString());
}
}
});
}

private static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily family) {
return logicalType.getTypeRoot().getFamilies().contains(family);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1592,78 +1592,6 @@ public void testAlterWithDefaultStringValue() throws Exception {
jobClient.cancel().get();
}

@Test
public void testShardingTablesWithInconsistentSchema() throws Exception {
userDatabase1.createAndInitialize();
userDatabase2.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE `user` ("
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " email STRING,"
+ " age INT,"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
userDatabase1.getUsername(),
userDatabase1.getPassword(),
String.format(
"(%s|%s)",
userDatabase1.getDatabaseName(), userDatabase2.getDatabaseName()),
"user_table_.*",
incrementalSnapshot,
getServerId(),
getSplitSize());
tEnv.executeSql(sourceDDL);

// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM `user`");

CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);

try (Connection connection = userDatabase1.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE user_table_1_1 SET email = 'user_111@bar.org' WHERE id=111;");
}

try (Connection connection = userDatabase2.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE user_table_2_2 SET age = 20 WHERE id=221;");
}

String[] expected =
new String[] {
"+I[111, user_111, Shanghai, 123567891234, user_111@foo.com, null]",
"-U[111, user_111, Shanghai, 123567891234, user_111@foo.com, null]",
"+U[111, user_111, Shanghai, 123567891234, user_111@bar.org, null]",
"+I[121, user_121, Shanghai, 123567891234, null, null]",
"+I[211, user_211, Shanghai, 123567891234, null, null]",
"+I[221, user_221, Shanghai, 123567891234, null, 18]",
"-U[221, user_221, Shanghai, 123567891234, null, 18]",
"+U[221, user_221, Shanghai, 123567891234, null, 20]",
};

assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}

@Test
public void testStartupFromSpecificBinlogFilePos() throws Exception {
inventoryDatabase.createAndInitialize();
Expand Down
Loading