diff --git a/flink-connector-jdbc/pom.xml b/flink-connector-jdbc/pom.xml
index 0a298e8b..e6c08d61 100644
--- a/flink-connector-jdbc/pom.xml
+++ b/flink-connector-jdbc/pom.xml
@@ -206,6 +206,21 @@ under the License.
test
+
+
+ com.clickhouse
+ clickhouse-jdbc
+ 0.4.6
+ provided
+
+
+
+ org.testcontainers
+ clickhouse
+ test
+
+
+
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialect.java
new file mode 100644
index 00000000..e27296d3
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialect.java
@@ -0,0 +1,87 @@
+package org.apache.flink.connector.jdbc.databases.clickhouse.dialect;
+
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** clickhouse dialect. */
+public class ClickHouseDialect extends AbstractDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ // Define MAX/MIN precision of TIMESTAMP type according to clickhouse docs:
+ // https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
+ private static final int MAX_TIMESTAMP_PRECISION = 9;
+ private static final int MIN_TIMESTAMP_PRECISION = 0;
+
+ // Define MAX/MIN precision of DECIMAL type according to clickhouse docs:
+ // https://clickhouse.com/docs/en/sql-reference/data-types/decimal
+ private static final int MAX_DECIMAL_PRECISION = 76;
+ private static final int MIN_DECIMAL_PRECISION = 1;
+
+ @Override
+ public AbstractJdbcRowConverter getRowConverter(RowType rowType) {
+ return new ClickHouseRowConvert(rowType);
+ }
+
+ @Override
+ public String getLimitClause(long limit) {
+ return "LIMIT " + limit;
+ }
+
+ @Override
+ public Optional defaultDriverName() {
+ return Optional.of("com.clickhouse.jdbc.ClickHouseDriver");
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
+ @Override
+ public Optional getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ return Optional.empty();
+ }
+
+ @Override
+ public String dialectName() {
+ return "ClickHouse";
+ }
+
+ @Override
+ public Optional timestampPrecisionRange() {
+ return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+ }
+
+ @Override
+ public Optional decimalPrecisionRange() {
+ return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
+ }
+
+ @Override
+ public Set supportedTypes() {
+ // LegacyTypeInfoDataTypeConverter.
+ return EnumSet.of(
+ LogicalTypeRoot.CHAR,
+ LogicalTypeRoot.VARCHAR,
+ LogicalTypeRoot.BOOLEAN,
+ LogicalTypeRoot.DECIMAL,
+ LogicalTypeRoot.TINYINT,
+ LogicalTypeRoot.SMALLINT,
+ LogicalTypeRoot.INTEGER,
+ LogicalTypeRoot.BIGINT,
+ LogicalTypeRoot.FLOAT,
+ LogicalTypeRoot.DOUBLE,
+ LogicalTypeRoot.DATE,
+ LogicalTypeRoot.MAP,
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialectFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialectFactory.java
new file mode 100644
index 00000000..a1eaf304
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialectFactory.java
@@ -0,0 +1,19 @@
+package org.apache.flink.connector.jdbc.databases.clickhouse.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** clickhouse dialect factory. */
+@Internal
+public class ClickHouseDialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:clickhouse:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new ClickHouseDialect();
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseRowConvert.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseRowConvert.java
new file mode 100644
index 00000000..f8aa16b2
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseRowConvert.java
@@ -0,0 +1,100 @@
+package org.apache.flink.connector.jdbc.databases.clickhouse.dialect;
+
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.clickhouse.data.value.UnsignedByte;
+import com.clickhouse.data.value.UnsignedInteger;
+import com.clickhouse.data.value.UnsignedShort;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Map;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * the clickhouse data types range link
+ * https://clickhouse.com/docs/en/sql-reference/data-types/int-uint .
+ */
+public class ClickHouseRowConvert extends AbstractJdbcRowConverter {
+ @Override
+ public String converterName() {
+ return "ClickHouse";
+ }
+
+ public ClickHouseRowConvert(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return null;
+ case BOOLEAN:
+ case FLOAT:
+ case DOUBLE:
+ return val -> val;
+ case TINYINT:
+ return val -> ((Byte) val).byteValue();
+ case SMALLINT:
+ return val ->
+ val instanceof UnsignedByte
+ ? ((UnsignedByte) val).shortValue()
+ : ((Short) val).shortValue();
+ case INTEGER:
+ return val ->
+ val instanceof UnsignedShort
+ ? ((UnsignedShort) val).intValue()
+ : ((Integer) val).intValue();
+ case BIGINT:
+ return jdbcField -> {
+ if (jdbcField instanceof UnsignedInteger) {
+ return ((UnsignedInteger) jdbcField).longValue();
+ } else if (jdbcField instanceof Long) {
+ return ((Long) jdbcField).longValue();
+ }
+ // UINT64 is not supported,the uint64 range exceeds the long range
+ throw new UnsupportedOperationException("Unsupported type:" + type);
+ };
+ case DECIMAL:
+ final int precision = ((DecimalType) type).getPrecision();
+ final int scale = ((DecimalType) type).getScale();
+ return val ->
+ val instanceof BigInteger
+ ? DecimalData.fromBigDecimal(
+ new BigDecimal((BigInteger) val, 0), precision, scale)
+ : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
+ case CHAR:
+ case VARCHAR:
+ return val -> StringData.fromString((String) val);
+ case DATE:
+ return val -> Long.valueOf(((LocalDate) val).toEpochDay()).intValue();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return val -> TimestampData.fromLocalDateTime((LocalDateTime) val);
+ case MAP:
+ return val -> new GenericMapData((Map, ?>) val);
+ default:
+ return super.createInternalConverter(type);
+ }
+ }
+
+ @Override
+ protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case MAP:
+ return (val, index, statement) -> statement.setObject(index, val);
+ default:
+ return super.createExternalConverter(type);
+ }
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java
index 4c4c434b..d44a8dc8 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java
@@ -51,6 +51,7 @@
import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.SMALLINT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE;
@@ -107,6 +108,7 @@ public class JdbcTypeUtil {
put(TIME_WITHOUT_TIME_ZONE, Types.TIME);
put(DECIMAL, Types.DECIMAL);
put(ARRAY, Types.ARRAY);
+ put(MAP, Types.JAVA_OBJECT);
}
});
diff --git a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
index e5a05b77..ffa20664 100644
--- a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
+++ b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
@@ -19,3 +19,4 @@ org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialectFactor
org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialectFactory
org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialectFactory
org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialectFactory
+org.apache.flink.connector.jdbc.databases.clickhouse.dialect.ClickHouseDialectFactory
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/ClickHouseTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/ClickHouseTestBase.java
new file mode 100644
index 00000000..612d9bd1
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/ClickHouseTestBase.java
@@ -0,0 +1,17 @@
+package org.apache.flink.connector.jdbc.databases.clickhouse;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
+import org.apache.flink.connector.jdbc.testutils.databases.clickhouse.ClickHouseDatabase;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** clickhouse database for testing. */
+@ExtendWith(ClickHouseDatabase.class)
+public interface ClickHouseTestBase extends DatabaseTest {
+
+ @Override
+ default DatabaseMetadata getMetadata() {
+ return ClickHouseDatabase.getMetadata();
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialectTypeTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialectTypeTest.java
new file mode 100644
index 00000000..8380de23
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialectTypeTest.java
@@ -0,0 +1,41 @@
+package org.apache.flink.connector.jdbc.databases.clickhouse.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** The Clickhouse params for {@link JdbcDialectTypeTest}. */
+public class ClickHouseDialectTypeTest extends JdbcDialectTypeTest {
+
+ @Override
+ protected String testDialect() {
+ return "clickhouse";
+ }
+
+ @Override
+ protected List testData() {
+ return Arrays.asList(
+ createTestItem("CHAR"),
+ createTestItem("VARCHAR"),
+ createTestItem("BOOLEAN"),
+ createTestItem("TINYINT"),
+ createTestItem("SMALLINT"),
+ createTestItem("INTEGER"),
+ createTestItem("BIGINT"),
+ createTestItem("FLOAT"),
+ createTestItem("DOUBLE"),
+ createTestItem("DECIMAL(10, 4)"),
+ createTestItem("DECIMAL(38, 18)"),
+ createTestItem("DATE"),
+ createTestItem("TIMESTAMP(3)"),
+ createTestItem("TIMESTAMP WITHOUT TIME ZONE"),
+ createTestItem("VARBINARY", "The ClickHouse dialect doesn't support type: BYTES"),
+
+ // Not valid data
+ createTestItem("BINARY", "The ClickHouse dialect doesn't support type: BINARY(1)."),
+ createTestItem(
+ "VARBINARY(10)",
+ "The ClickHouse dialect doesn't support type: VARBINARY(10)."));
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickHouseTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickHouseTableSourceITCase.java
new file mode 100644
index 00000000..ef5580ab
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickHouseTableSourceITCase.java
@@ -0,0 +1,87 @@
+package org.apache.flink.connector.jdbc.databases.clickhouse.table;
+
+import org.apache.flink.connector.jdbc.databases.clickhouse.ClickHouseTestBase;
+import org.apache.flink.connector.jdbc.databases.clickhouse.dialect.ClickHouseDialect;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.ckTableRow;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
+
+/** The Table Source ITCase for {@link ClickHouseDialect}. */
+class ClickHouseTableSourceITCase extends JdbcDynamicTableSourceITCase
+ implements ClickHouseTestBase {
+
+ @Override
+ protected ClickhouseTableRow createInputTable() {
+ return ckTableRow(
+ "jdbDynamicTableSource",
+ pkField("id", dbType("Int64"), DataTypes.BIGINT().notNull()),
+ field("user_id_int8", dbType("Int8"), DataTypes.TINYINT().notNull()),
+ field("user_id_int16", dbType("Int16"), DataTypes.SMALLINT().notNull()),
+ field("user_id_int32", dbType("Int32"), DataTypes.INT().notNull()),
+ field("user_id_int64", dbType("Int64"), DataTypes.BIGINT().notNull()),
+ field("price_float", dbType("Float32"), DataTypes.FLOAT()),
+ field("price_double", dbType("Float64"), DataTypes.DOUBLE()),
+ field("decimal_col", dbType("Decimal64(4)"), DataTypes.DECIMAL(10, 4)),
+ field("user_date", dbType("Date"), DataTypes.DATE()),
+ field("timestamp6_col", dbType("DateTime(6)"), DataTypes.TIMESTAMP(6)),
+ field("decimal_column", dbType("Decimal(3,1)"), DataTypes.DECIMAL(3, 1)),
+ field("bool_flag", dbType("Bool"), DataTypes.BOOLEAN()),
+ field("message", dbType("String"), DataTypes.VARCHAR(100)),
+ field(
+ "test_map",
+ dbType("Map(Int64,Int64)"),
+ DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BIGINT())));
+ }
+
+ @Override
+ protected List getTestData() {
+ TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
+ TimeZone.setDefault(timeZone);
+ HashMap map = new HashMap<>();
+ map.put(1L, 2L);
+ return Arrays.asList(
+ Row.of(
+ 1L,
+ (byte) 1,
+ (short) -32768,
+ -2147483648,
+ -9223372036854775808L,
+ -3.4e+38f,
+ -1.7e+308d,
+ BigDecimal.valueOf(100.1234),
+ LocalDate.parse("2023-01-01"),
+ LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+ BigDecimal.valueOf(-99.9),
+ true,
+ "this is a test message",
+ map),
+ Row.of(
+ 2L,
+ (byte) 2,
+ (short) 32767,
+ 2147483647,
+ 9223372036854775807L,
+ 3.4e+38f,
+ 1.7e+308d,
+ BigDecimal.valueOf(101.1234),
+ LocalDate.parse("2023-01-02"),
+ LocalDateTime.parse("2020-01-01T15:36:01.123456"),
+ BigDecimal.valueOf(99.9),
+ false,
+ "this is a test message",
+ map));
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickhouseTableRow.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickhouseTableRow.java
new file mode 100644
index 00000000..830d00a5
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickhouseTableRow.java
@@ -0,0 +1,34 @@
+package org.apache.flink.connector.jdbc.databases.clickhouse.table;
+
+import org.apache.flink.connector.jdbc.testutils.tables.TableField;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+
+import java.util.stream.Collectors;
+
+/** ClickhouseTableRow . */
+public class ClickhouseTableRow extends TableRow {
+
+ public ClickhouseTableRow(String name, TableField[] fields) {
+ super(name, fields);
+ }
+
+ @Override
+ public String getCreateQuery() {
+ String pkFields =
+ getStreamFields()
+ .filter(TableField::isPkField)
+ .map(TableField::getName)
+ .collect(Collectors.joining(", "));
+ return String.format(
+ "CREATE TABLE %s (%s) %s PRIMARY KEY (%s)",
+ getTableName(),
+ getStreamFields().map(TableField::asString).collect(Collectors.joining(", ")),
+ "ENGINE = MergeTree",
+ pkFields);
+ }
+
+ @Override
+ protected String getDeleteFromQuery() {
+ return String.format("truncate table %s", getTableName());
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickhouseTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickhouseTableSinkITCase.java
new file mode 100644
index 00000000..d28bd389
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickhouseTableSinkITCase.java
@@ -0,0 +1,95 @@
+package org.apache.flink.connector.jdbc.databases.clickhouse.table;
+
+import org.apache.flink.connector.jdbc.databases.clickhouse.ClickHouseTestBase;
+import org.apache.flink.connector.jdbc.databases.clickhouse.dialect.ClickHouseDialect;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TimeZone;
+
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.ckTableRow;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
+
+/** The Table Sink ITCase for {@link ClickHouseDialect}. */
+class ClickhouseTableSinkITCase extends JdbcDynamicTableSinkITCase implements ClickHouseTestBase {
+ @Override
+ protected TableRow createUpsertOutputTable() {
+ return ckTableRow(
+ "dynamicSinkForUpsert",
+ pkField("cnt", dbType("Int64"), DataTypes.BIGINT().notNull()),
+ field("lencnt", dbType("Int64"), DataTypes.BIGINT().notNull()),
+ pkField("cTag", DataTypes.INT().notNull()),
+ field("ts", dbType("DateTime"), DataTypes.TIMESTAMP()));
+ }
+
+ @Override
+ protected TableRow createAppendOutputTable() {
+ TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
+ TimeZone.setDefault(timeZone);
+ return ckTableRow(
+ "dynamicSinkForAppend",
+ field("id", DataTypes.INT().notNull()),
+ field("num", dbType("Int64"), DataTypes.BIGINT().notNull()),
+ field("ts", dbType("DateTime64"), DataTypes.TIMESTAMP()));
+ }
+
+ @Override
+ protected TableRow createBatchOutputTable() {
+ return ckTableRow(
+ "dynamicSinkForBatch",
+ field("NAME", DataTypes.VARCHAR(20).notNull()),
+ field("SCORE", dbType("Int64"), DataTypes.BIGINT().notNull()));
+ }
+
+ @Override
+ protected TableRow createUserOutputTable() {
+ return ckTableRow(
+ "USER_TABLE",
+ pkField("user_id", DataTypes.VARCHAR(20).notNull()),
+ field("user_name", DataTypes.VARCHAR(20).notNull()),
+ field("email", DataTypes.VARCHAR(255)),
+ field("balance", DataTypes.DECIMAL(18, 2)),
+ field("balance2", DataTypes.DECIMAL(18, 2)));
+ }
+
+ @Override
+ protected TableRow createRealOutputTable() {
+ return ckTableRow("REAL_TABLE", field("real_data", dbType("REAL"), DataTypes.FLOAT()));
+ }
+
+ @Override
+ protected TableRow createCheckpointOutputTable() {
+ return ckTableRow(
+ "checkpointTable", field("id", dbType("Int64"), DataTypes.BIGINT().notNull()));
+ }
+
+ @Override
+ protected List testUserData() {
+ return Arrays.asList(
+ Row.of(
+ "user1",
+ "Tom",
+ "tom123@gmail.com",
+ new BigDecimal("8.1"),
+ new BigDecimal("16.2")),
+ Row.of(
+ "user3",
+ "Bailey",
+ "bailey@qq.com",
+ new BigDecimal("9.99"),
+ new BigDecimal("19.98")),
+ Row.of(
+ "user4",
+ "Tina",
+ "tina@gmail.com",
+ new BigDecimal("11.3"),
+ new BigDecimal("22.6")));
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/UnsignedTypeConversionITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/UnsignedTypeConversionITCase.java
new file mode 100644
index 00000000..10ea067e
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/UnsignedTypeConversionITCase.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.clickhouse.table;
+
+import org.apache.flink.connector.jdbc.databases.clickhouse.ClickHouseTestBase;
+import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static java.lang.String.format;
+import static java.lang.String.join;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.ckTableRow;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test unsigned type conversion between Flink and JDBC driver ClickHouse, the test underlying use
+ * ClickHouse to mock a DB.
+ */
+class UnsignedTypeConversionITCase extends AbstractTestBase implements ClickHouseTestBase {
+
+ private static final String TABLE_SOURCE = "jdbc_source";
+ private static final String TABLE_SINK = "jdbc_sink";
+ private static final String TABLE_DATA = "data";
+ private static final TableRow TABLE =
+ ckTableRow(
+ "unsigned_test",
+ pkField("id", dbType("Int64"), DataTypes.BIGINT().notNull()),
+ field("small_u", dbType("UInt8"), DataTypes.SMALLINT().notNull()),
+ field("int_u", dbType("UInt16"), DataTypes.INT().notNull()),
+ field("bigint_u", dbType("UInt32"), DataTypes.BIGINT().notNull()));
+
+ public List getManagedTables() {
+ return Collections.singletonList(TABLE);
+ }
+
+ private static final List ROW =
+ Arrays.asList(
+ Row.of(1L, (short) 0, 0, 0L), Row.of(2L, (short) 255, 65535, 4294967295L));
+
+ @Test
+ void testUnsignedType() throws Exception {
+ try (Connection con = getMetadata().getConnection()) {
+ StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ TableEnvironment tableEnv = StreamTableEnvironment.create(sEnv);
+ createFlinkTable(tableEnv);
+ prepareData(tableEnv);
+
+ // write data to db
+ String columns = join(",", TABLE.getTableFields());
+ tableEnv.executeSql(
+ format(
+ "insert into %s select %s from %s",
+ TABLE_SINK, columns, TABLE_DATA))
+ .await();
+
+ // read data from db using jdbc connection and compare
+ List selectAll = TABLE.selectAllTable(con);
+ assertThat(selectAll).isEqualTo(ROW);
+
+ // read data from db using flink and compare
+ String sql = format("select %s from %s", columns, TABLE_SOURCE);
+ CloseableIterator collected = tableEnv.executeSql(sql).collect();
+ List result = CollectionUtil.iteratorToList(collected);
+ assertThat(result).isEqualTo(ROW);
+ }
+ }
+
+ private void createFlinkTable(TableEnvironment tableEnv) {
+ tableEnv.executeSql(TABLE.getCreateQueryForFlink(getMetadata(), TABLE_SOURCE));
+ tableEnv.executeSql(TABLE.getCreateQueryForFlink(getMetadata(), TABLE_SINK));
+ }
+
+ private void prepareData(TableEnvironment tableEnv) {
+ Table dataTable = tableEnv.fromValues(DataTypes.ROW(TABLE.getTableDataFields()), ROW);
+ tableEnv.createTemporaryView(TABLE_DATA, dataTable);
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
index 1973a185..052a76e1 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
@@ -213,6 +213,9 @@ void testReal() throws Exception {
@Test
void testUpsert() throws Exception {
+ if (!getMetadata().supportUpdate()) {
+ return;
+ }
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -322,6 +325,9 @@ void testBatchSink() throws Exception {
@Test
void testReadingFromChangelogSource() throws Exception {
+ if (!getMetadata().supportUpdate()) {
+ return;
+ }
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
String dataId = TestValuesTableFactory.registerData(TestData.userChangelog());
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseMetadata.java
index 30ba9cc3..363e5f64 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseMetadata.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseMetadata.java
@@ -44,6 +44,10 @@ public interface DatabaseMetadata extends Serializable {
String getVersion();
+ default boolean supportUpdate() {
+ return true;
+ }
+
default SerializableSupplier getXaSourceSupplier() {
return this::buildXaDataSource;
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/clickhouse/ClickHouseDatabase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/clickhouse/ClickHouseDatabase.java
new file mode 100644
index 00000000..50a3f56d
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/clickhouse/ClickHouseDatabase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.clickhouse;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.testcontainers.containers.ClickHouseContainer;
+
+/** A Clickhouse database for testing. */
+public class ClickHouseDatabase extends DatabaseExtension implements ClickHouseImages {
+
+ private static final ClickHouseContainer CONTAINER =
+ new ClickHouseContainer(CLICKHOUSE_IMAGE_23);
+
+ private static ClickHouseMetadata metadata;
+
+ public static ClickHouseMetadata getMetadata() {
+ if (!CONTAINER.isRunning()) {
+ throw new FlinkRuntimeException("Container is stopped.");
+ }
+ if (metadata == null) {
+ metadata = new ClickHouseMetadata(CONTAINER, false);
+ }
+ return metadata;
+ }
+
+ @Override
+ protected DatabaseMetadata startDatabase() throws Exception {
+ CONTAINER.start();
+ return getMetadata();
+ }
+
+ @Override
+ protected void stopDatabase() throws Exception {
+ CONTAINER.stop();
+ metadata = null;
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/clickhouse/ClickHouseImages.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/clickhouse/ClickHouseImages.java
new file mode 100644
index 00000000..64d09724
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/clickhouse/ClickHouseImages.java
@@ -0,0 +1,11 @@
+package org.apache.flink.connector.jdbc.testutils.databases.clickhouse;
+
+import org.testcontainers.utility.DockerImageName;
+
+/** clickhouse images. */
+public interface ClickHouseImages {
+
+ DockerImageName CLICKHOUSE_IMAGE_23 =
+ DockerImageName.parse("clickhouse/clickhouse-server:23.4.2")
+ .asCompatibleSubstituteFor("yandex/clickhouse-server");
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/clickhouse/ClickHouseMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/clickhouse/ClickHouseMetadata.java
new file mode 100644
index 00000000..b0862517
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/clickhouse/ClickHouseMetadata.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.clickhouse;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+
+import org.testcontainers.containers.ClickHouseContainer;
+
+import javax.sql.XADataSource;
+
+/** clickhouse Metadata. */
+public class ClickHouseMetadata implements DatabaseMetadata {
+
+ private final String username;
+ private final String password;
+ private final String url;
+ private final String driver;
+ private final String version;
+ private final boolean xaEnabled;
+
+ public ClickHouseMetadata(ClickHouseContainer container) {
+ this(container, false);
+ }
+
+ public ClickHouseMetadata(ClickHouseContainer container, boolean hasXaEnabled) {
+ this.username = container.getUsername();
+ this.password = container.getPassword();
+ this.url = container.getJdbcUrl();
+ this.driver = container.getDriverClassName();
+ this.version = container.getDockerImageName();
+ this.xaEnabled = hasXaEnabled;
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return this.url;
+ }
+
+ @Override
+ public String getJdbcUrlWithCredentials() {
+ return String.format("%s?user=%s&password=%s", getJdbcUrl(), getUsername(), getPassword());
+ }
+
+ @Override
+ public String getUsername() {
+ return this.username;
+ }
+
+ @Override
+ public String getPassword() {
+ return this.password;
+ }
+
+ @Override
+ public XADataSource buildXaDataSource() {
+ return null;
+ }
+
+ @Override
+ public String getDriverClass() {
+ return this.driver;
+ }
+
+ @Override
+ public String getVersion() {
+ return this.version;
+ }
+
+ @Override
+ public boolean supportUpdate() {
+ return false;
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java
index d8cbd793..8599afa1 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java
@@ -69,7 +69,7 @@ public String getTableName() {
return name;
}
- private Stream getStreamFields() {
+ protected Stream getStreamFields() {
return Arrays.stream(this.fields);
}
@@ -291,4 +291,8 @@ protected T getNullable(ResultSet rs, FunctionWithException T getNullable(ResultSet rs, T value) throws SQLException {
return rs.wasNull() ? null : value;
}
+
+ public TableField[] getFields() {
+ return fields;
+ }
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java
index 6a8f80d3..18838cfa 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.jdbc.testutils.tables;
+import org.apache.flink.connector.jdbc.databases.clickhouse.table.ClickhouseTableRow;
import org.apache.flink.table.types.DataType;
/** Table builder. * */
@@ -51,4 +52,8 @@ private static TableField createField(
String name, TableField.DbType dbType, DataType dataType, boolean pkField) {
return new TableField(name, dataType, dbType, pkField);
}
+
+ public static ClickhouseTableRow ckTableRow(String name, TableField... fields) {
+ return new ClickhouseTableRow(name, fields);
+ }
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java
index b4c67209..6536f9d0 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java
@@ -35,7 +35,6 @@ class JdbcTypeUtilTest {
void testTypeConversions() {
assertThat(logicalTypeToSqlType(LogicalTypeRoot.INTEGER)).isEqualTo(Types.INTEGER);
testUnsupportedType(LogicalTypeRoot.RAW);
- testUnsupportedType(LogicalTypeRoot.MAP);
}
private static void testUnsupportedType(LogicalTypeRoot logicalTypeRoot) {