Skip to content

Commit

Permalink
[FLINK-32068] connector jdbc support clickhouse ,support map types
Browse files Browse the repository at this point in the history
  • Loading branch information
WenDing-Y committed Jun 7, 2023
1 parent 59b1122 commit 667e72b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public Set<LogicalTypeRoot> supportedTypes() {
LogicalTypeRoot.FLOAT,
LogicalTypeRoot.DOUBLE,
LogicalTypeRoot.DATE,
LogicalTypeRoot.MAP,
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
Expand All @@ -16,6 +17,7 @@
import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Map;

/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
Expand Down Expand Up @@ -79,8 +81,20 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return val -> TimestampData.fromLocalDateTime((LocalDateTime) val);
case MAP:
return val -> new GenericMapData((Map<?, ?>) val);
default:
return super.createInternalConverter(type);
}
}

@Override
protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case MAP:
return (val, index, statement) -> statement.setObject(index, val);
default:
return super.createExternalConverter(type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.SMALLINT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE;
Expand Down Expand Up @@ -107,6 +108,7 @@ public class JdbcTypeUtil {
put(TIME_WITHOUT_TIME_ZONE, Types.TIME);
put(DECIMAL, Types.DECIMAL);
put(ARRAY, Types.ARRAY);
put(MAP, Types.JAVA_OBJECT);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.TimeZone;

Expand Down Expand Up @@ -38,13 +39,19 @@ protected ClickhouseTableRow createInputTable() {
field("timestamp6_col", dbType("DateTime(6)"), DataTypes.TIMESTAMP(6)),
field("decimal_column", dbType("Decimal(3,1)"), DataTypes.DECIMAL(3, 1)),
field("bool_flag", dbType("Bool"), DataTypes.BOOLEAN()),
field("message", dbType("String"), DataTypes.VARCHAR(100)));
field("message", dbType("String"), DataTypes.VARCHAR(100)),
field(
"test_map",
dbType("Map(Int64,Int64)"),
DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BIGINT())));
}

@Override
protected List<Row> getTestData() {
TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
TimeZone.setDefault(timeZone);
HashMap<Long, Long> map = new HashMap<>();
map.put(1L, 2L);
return Arrays.asList(
Row.of(
1L,
Expand All @@ -59,7 +66,8 @@ protected List<Row> getTestData() {
LocalDateTime.parse("2020-01-01T15:35:00.123456"),
BigDecimal.valueOf(-99.9),
true,
"this is a test message"),
"this is a test message",
map),
Row.of(
2L,
(byte) 2,
Expand All @@ -73,6 +81,7 @@ protected List<Row> getTestData() {
LocalDateTime.parse("2020-01-01T15:36:01.123456"),
BigDecimal.valueOf(99.9),
false,
"this is a test message"));
"this is a test message",
map));
}
}

0 comments on commit 667e72b

Please sign in to comment.