Skip to content

Commit

Permalink
[FLINK-36408] Fix MySQL pipeline connector failed to parse FLOAT type…
Browse files Browse the repository at this point in the history
… with precision

Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
  • Loading branch information
yuxiqian committed Sep 30, 2024
1 parent 4b13c49 commit fd469c2
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,12 @@ private static DataType convertFromColumn(Column column) {
case FLOAT:
case FLOAT_UNSIGNED:
case FLOAT_UNSIGNED_ZEROFILL:
return DataTypes.FLOAT();
if (column.length() != -1) {
// For FLOAT types with length provided explicitly, treat it like DOUBLE
return DataTypes.DOUBLE();
} else {
return DataTypes.FLOAT();
}
case REAL:
case REAL_UNSIGNED:
case REAL_UNSIGNED_ZEROFILL:
Expand Down Expand Up @@ -236,7 +241,7 @@ private static DataType convertFromColumn(Column column) {
return DataTypes.ARRAY(DataTypes.STRING());
default:
throw new UnsupportedOperationException(
String.format("Don't support MySQL type '%s' yet.", typeName));
String.format("MySQL type '%s' is not supported yet.", typeName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,18 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.TIMESTAMP_LTZ(0));
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE());

Object[] expectedSnapshot =
new Object[] {
Expand All @@ -265,7 +276,19 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22"))
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
2d,
3d,
5d,
7d,
11d,
13d,
17d,
19d,
23d,
29d,
31d,
37d
};

Object[] expectedStreamRecord =
Expand All @@ -282,7 +305,19 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22"))
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
2d,
3d,
5d,
7d,
11d,
13d,
17d,
19d,
23d,
29d,
31d,
37d
};

database.createAndInitialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,156 @@ public void testMysql8AccessTimeTypesSchema() {
assertThat(actualSchema).isEqualTo(expectedSchema);
}

@Test
public void testMysql57PrecisionTypesSchema() {
fullTypesMySql57Database.createAndInitialize();

String[] tables = new String[] {"precision_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql57Database);

Schema actualSchema =
metadataAccessor.getTableSchema(
TableId.tableId(
fullTypesMySql57Database.getDatabaseName(), "precision_types"));
Schema expectedSchema =
Schema.newBuilder()
.primaryKey("id")
.fromRowDataType(
RowType.of(
new DataType[] {
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.DECIMAL(6, 2),
DataTypes.DECIMAL(9, 4),
DataTypes.DECIMAL(20, 4),
DataTypes.TIME(0),
DataTypes.TIME(3),
DataTypes.TIME(6),
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE()
},
new String[] {
"id",
"decimal_c0",
"decimal_c1",
"decimal_c2",
"time_c",
"time_3_c",
"time_6_c",
"datetime_c",
"datetime3_c",
"datetime6_c",
"timestamp_c",
"timestamp3_c",
"timestamp6_c",
"float_c0",
"float_c1",
"float_c2",
"real_c0",
"real_c1",
"real_c2",
"double_c0",
"double_c1",
"double_c2",
"double_precision_c0",
"double_precision_c1",
"double_precision_c2"
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);
}

@Test
public void testMysql8PrecisionTypesSchema() {
fullTypesMySql8Database.createAndInitialize();

String[] tables = new String[] {"precision_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql8Database);

Schema actualSchema =
metadataAccessor.getTableSchema(
TableId.tableId(
fullTypesMySql8Database.getDatabaseName(), "precision_types"));
Schema expectedSchema =
Schema.newBuilder()
.primaryKey("id")
.fromRowDataType(
RowType.of(
new DataType[] {
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.DECIMAL(6, 2),
DataTypes.DECIMAL(9, 4),
DataTypes.DECIMAL(20, 4),
DataTypes.TIME(0),
DataTypes.TIME(3),
DataTypes.TIME(6),
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
DataTypes.DOUBLE()
},
new String[] {
"id",
"decimal_c0",
"decimal_c1",
"decimal_c2",
"time_c",
"time_3_c",
"time_6_c",
"datetime_c",
"datetime3_c",
"datetime6_c",
"timestamp_c",
"timestamp3_c",
"timestamp6_c",
"float_c0",
"float_c1",
"float_c2",
"real_c0",
"real_c1",
"real_c2",
"double_c0",
"double_c1",
"double_c2",
"double_precision_c0",
"double_precision_c1",
"double_precision_c2"
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);
}

private void testAccessDatabaseAndTable(UniqueDatabase database) {
database.createAndInitialize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ CREATE TABLE precision_types
timestamp_c TIMESTAMP(0) NULL,
timestamp3_c TIMESTAMP(3) NULL,
timestamp6_c TIMESTAMP(6) NULL,
float_c0 FLOAT(6, 0),
float_c1 FLOAT(20, 3),
float_c2 FLOAT(24, 12),
real_c0 REAL(6, 0),
real_c1 REAL(20, 3),
real_c2 REAL(24, 12),
double_c0 DOUBLE(6, 0),
double_c1 DOUBLE(20, 3),
double_c2 DOUBLE(24, 12),
double_precision_c0 DOUBLE PRECISION(6, 0),
double_precision_c1 DOUBLE PRECISION(20, 3),
double_precision_c2 DOUBLE PRECISION(24, 12),
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

Expand All @@ -156,4 +168,16 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22',
'2020-07-17 18:00',
'2020-07-17 18:00:22',
'2020-07-17 18:00:22');
'2020-07-17 18:00:22',
2,
3,
5,
7,
11,
13,
17,
19,
23,
29,
31,
37);
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,21 @@ CREATE TABLE precision_types
datetime_c DATETIME(0),
datetime3_c DATETIME(3),
datetime6_c DATETIME(6),
timestamp_c TIMESTAMP(0),
timestamp3_c TIMESTAMP(3),
timestamp6_c TIMESTAMP(6),
timestamp_c TIMESTAMP(0) NULL,
timestamp3_c TIMESTAMP(3) NULL,
timestamp6_c TIMESTAMP(6) NULL,
float_c0 FLOAT(6, 0),
float_c1 FLOAT(20, 3),
float_c2 FLOAT(24, 12),
real_c0 REAL(6, 0),
real_c1 REAL(20, 3),
real_c2 REAL(24, 12),
double_c0 DOUBLE(6, 0),
double_c1 DOUBLE(20, 3),
double_c2 DOUBLE(24, 12),
double_precision_c0 DOUBLE PRECISION(6, 0),
double_precision_c1 DOUBLE PRECISION(20, 3),
double_precision_c2 DOUBLE PRECISION(24, 12),
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

Expand All @@ -160,4 +172,16 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22',
'2020-07-17 18:00',
'2020-07-17 18:00:22',
'2020-07-17 18:00:22');
'2020-07-17 18:00:22',
2,
3,
5,
7,
11,
13,
17,
19,
23,
29,
31,
37);

0 comments on commit fd469c2

Please sign in to comment.