diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialect.java index 620e8935..e27296d3 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialect.java @@ -80,6 +80,7 @@ public Set supportedTypes() { LogicalTypeRoot.FLOAT, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.DATE, + LogicalTypeRoot.MAP, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE); } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseRowConvert.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseRowConvert.java index 1fd1d8e5..f8aa16b2 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseRowConvert.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseRowConvert.java @@ -2,6 +2,7 @@ import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.DecimalType; @@ -16,6 +17,7 @@ import java.math.BigInteger; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.Map; /** * Runtime converter that responsible to convert between JDBC object and Flink internal object for @@ -79,8 +81,20 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type) case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return val -> TimestampData.fromLocalDateTime((LocalDateTime) val); + case MAP: + return val -> new GenericMapData((Map) val); default: return super.createInternalConverter(type); } } + + @Override + protected JdbcSerializationConverter createExternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case MAP: + return (val, index, statement) -> statement.setObject(index, val); + default: + return super.createExternalConverter(type); + } + } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java index 4c4c434b..d44a8dc8 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java @@ -51,6 +51,7 @@ import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE; import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT; import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP; import static org.apache.flink.table.types.logical.LogicalTypeRoot.SMALLINT; import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE; @@ -107,6 +108,7 @@ public class JdbcTypeUtil { put(TIME_WITHOUT_TIME_ZONE, Types.TIME); put(DECIMAL, Types.DECIMAL); put(ARRAY, Types.ARRAY); + put(MAP, Types.JAVA_OBJECT); } }); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickHouseTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickHouseTableSourceITCase.java index 0dd88ed0..ef5580ab 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickHouseTableSourceITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickHouseTableSourceITCase.java @@ -10,6 +10,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.TimeZone; @@ -38,13 +39,19 @@ protected ClickhouseTableRow createInputTable() { field("timestamp6_col", dbType("DateTime(6)"), DataTypes.TIMESTAMP(6)), field("decimal_column", dbType("Decimal(3,1)"), DataTypes.DECIMAL(3, 1)), field("bool_flag", dbType("Bool"), DataTypes.BOOLEAN()), - field("message", dbType("String"), DataTypes.VARCHAR(100))); + field("message", dbType("String"), DataTypes.VARCHAR(100)), + field( + "test_map", + dbType("Map(Int64,Int64)"), + DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BIGINT()))); } @Override protected List getTestData() { TimeZone timeZone = TimeZone.getTimeZone("GTM+0"); TimeZone.setDefault(timeZone); + HashMap map = new HashMap<>(); + map.put(1L, 2L); return Arrays.asList( Row.of( 1L, @@ -59,7 +66,8 @@ protected List getTestData() { LocalDateTime.parse("2020-01-01T15:35:00.123456"), BigDecimal.valueOf(-99.9), true, - "this is a test message"), + "this is a test message", + map), Row.of( 2L, (byte) 2, @@ -73,6 +81,7 @@ protected List getTestData() { LocalDateTime.parse("2020-01-01T15:36:01.123456"), BigDecimal.valueOf(99.9), false, - "this is a test message")); + "this is a test message", + map)); } }