@@ -613,6 +646,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
MySQL type |
Oracle type |
PostgreSQL type |
+ CrateDB type |
SQL Server type |
}}">Flink SQL type |
@@ -622,6 +656,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TINYINT |
|
|
+ |
TINYINT |
TINYINT |
@@ -635,6 +670,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
INT2
SMALLSERIAL
SERIAL2
+
+ SMALLINT
+ SHORT |
SMALLINT |
SMALLINT |
@@ -647,6 +685,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
INTEGER
SERIAL |
+
+ INTEGER
+ INT |
INT |
INT |
@@ -658,6 +699,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
BIGINT
BIGSERIAL |
+
+ BIGINT
+ LONG |
BIGINT |
BIGINT |
@@ -666,6 +710,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
|
|
|
+ |
DECIMAL(20, 0) |
@@ -675,6 +720,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
REAL
FLOAT4 |
+
+ REAL
+ FLOAT |
REAL |
FLOAT |
@@ -686,6 +734,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
FLOAT8
DOUBLE PRECISION |
+
+ DOUBLE
+ DOUBLE PRECISION |
FLOAT |
DOUBLE |
@@ -702,6 +753,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
NUMERIC(p, s)
DECIMAL(p, s) |
+ NUMERIC(p, s) |
DECIMAL(p, s) |
DECIMAL(p, s) |
@@ -711,6 +763,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TINYINT(1)
|
BOOLEAN |
+ BOOLEAN |
BIT |
BOOLEAN |
@@ -718,6 +771,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
DATE |
DATE |
DATE |
+ DATE (only in expressions - not stored type) |
DATE |
DATE |
@@ -725,6 +779,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TIME [(p)] |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
+ TIME (only in expressions - not stored type) |
TIME(0) |
TIME [(p)] [WITHOUT TIMEZONE] |
@@ -732,6 +787,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+ TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
DATETIME
DATETIME2
@@ -753,6 +809,13 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
VARCHAR(n)
CHARACTER VARYING(n)
TEXT |
+
+ CHAR(n)
+ CHARACTER(n)
+ VARCHAR(n)
+ CHARACTER VARYING(n)
+ TEXT
+ STRING |
CHAR(n)
NCHAR(n)
@@ -771,6 +834,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
RAW(s)
BLOB |
BYTEA |
+ |
BINARY(n)
VARBINARY(n)
|
@@ -780,6 +844,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
|
|
ARRAY |
+ ARRAY |
|
ARRAY |
diff --git a/flink-connector-jdbc/pom.xml b/flink-connector-jdbc/pom.xml
index 793aee55..e6c08d61 100644
--- a/flink-connector-jdbc/pom.xml
+++ b/flink-connector-jdbc/pom.xml
@@ -55,7 +55,6 @@ under the License.
-
org.postgresql
postgresql
@@ -79,6 +78,20 @@ under the License.
provided
+
+
+ io.crate
+ crate-jdbc
+ 2.7.0
+ provided
+
+
+ net.java.dev.jna
+ jna
+
+
+
+
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
index a5c6c1e8..84ac0abd 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
@@ -18,6 +18,8 @@
package org.apache.flink.connector.jdbc.catalog;
+import org.apache.flink.connector.jdbc.databases.cratedb.catalog.CrateDBCatalog;
+import org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialect;
import org.apache.flink.connector.jdbc.databases.mysql.catalog.MySqlCatalog;
import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog;
@@ -52,6 +54,9 @@ public static AbstractJdbcCatalog createCatalog(
if (dialect instanceof PostgresDialect) {
return new PostgresCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
+ } else if (dialect instanceof CrateDBDialect) {
+ return new CrateDBCatalog(
+ userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
} else if (dialect instanceof MySqlDialect) {
return new MySqlCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java
new file mode 100644
index 00000000..86c6f16c
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.converter;
+
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import java.lang.reflect.Array;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * PostgreSQL compatible databases.
+ */
+public abstract class AbstractPostgresCompatibleRowConverter
+ extends AbstractJdbcRowConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ protected AbstractPostgresCompatibleRowConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+ LogicalTypeRoot root = type.getTypeRoot();
+
+ if (root == LogicalTypeRoot.ARRAY) {
+ ArrayType arrayType = (ArrayType) type;
+ return createPostgresArrayConverter(arrayType);
+ } else {
+ return createPrimitiveConverter(type);
+ }
+ }
+
+ @Override
+ protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) {
+ LogicalTypeRoot root = type.getTypeRoot();
+ if (root == LogicalTypeRoot.ARRAY) {
+ // note:Writing ARRAY type is not yet supported by PostgreSQL dialect now.
+ return (val, index, statement) -> {
+ throw new IllegalStateException(
+ String.format(
+ "Writing ARRAY type is not yet supported in JDBC:%s.",
+ converterName()));
+ };
+ } else {
+ return super.createNullableExternalConverter(type);
+ }
+ }
+
+ private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arrayType) {
+ // Since PGJDBC 42.2.15 (https://github.com/pgjdbc/pgjdbc/pull/1194) bytea[] is wrapped in
+ // primitive byte arrays
+ final Class> elementClass =
+ LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+ final JdbcDeserializationConverter elementConverter =
+ createNullableInternalConverter(arrayType.getElementType());
+ return val -> {
+ @SuppressWarnings("unchecked")
+ T pgArray = (T) val;
+ Object[] in = (Object[]) pgArray.getArray();
+ final Object[] array = (Object[]) Array.newInstance(elementClass, in.length);
+ for (int i = 0; i < in.length; i++) {
+ array[i] = elementConverter.deserialize(in[i]);
+ }
+ return new GenericArrayData(array);
+ };
+ }
+
+ // Have its own method so that Postgres can support primitives that super class doesn't support
+ // in the future
+ private JdbcDeserializationConverter createPrimitiveConverter(LogicalType type) {
+ return super.createInternalConverter(type);
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java
new file mode 100644
index 00000000..8eb0927e
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.commons.compress.utils.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Catalog for CrateDB. */
+@Internal
+public class CrateDBCatalog extends PostgresCatalog {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CrateDBCatalog.class);
+
+ public static final String DEFAULT_DATABASE = "crate";
+
+ private static final Set builtinSchemas =
+ new HashSet() {
+ {
+ add("pg_catalog");
+ add("information_schema");
+ add("sys");
+ }
+ };
+
+ public CrateDBCatalog(
+ ClassLoader userClassLoader,
+ String catalogName,
+ String defaultDatabase,
+ String username,
+ String pwd,
+ String baseUrl) {
+ super(
+ userClassLoader,
+ catalogName,
+ defaultDatabase,
+ username,
+ pwd,
+ baseUrl,
+ new CrateDBTypeMapper());
+ }
+
+ // ------ databases ------
+
+ @Override
+ public List listDatabases() throws CatalogException {
+ return ImmutableList.of(DEFAULT_DATABASE);
+ }
+
+ // ------ schemas ------
+
+ protected Set getBuiltinSchemas() {
+ return builtinSchemas;
+ }
+
+ // ------ tables ------
+
+ @Override
+ protected List getPureTables(Connection conn, List schemas)
+ throws SQLException {
+ List tables = Lists.newArrayList();
+
+ // position 1 is database name, position 2 is schema name, position 3 is table name
+ try (PreparedStatement ps =
+ conn.prepareStatement(
+ "SELECT table_name FROM information_schema.tables "
+ + "WHERE table_schema = ? "
+ + "ORDER BY table_type, table_name")) {
+ for (String schema : schemas) {
+ // Column index 1 is database name, 2 is schema name, 3 is table name
+ extractColumnValuesByStatement(ps, 1, null, schema).stream()
+ .map(pureTable -> schema + "." + pureTable)
+ .forEach(tables::add);
+ }
+ return tables;
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ List tables;
+ try {
+ tables = listTables(tablePath.getDatabaseName());
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+
+ String searchPath =
+ extractColumnValuesBySQL(baseUrl + DEFAULT_DATABASE, "show search_path", 1, null)
+ .get(0);
+ String[] schemas = searchPath.split("\\s*,\\s*");
+
+ if (tables.contains(getSchemaTableName(tablePath))) {
+ return true;
+ }
+ for (String schema : schemas) {
+ if (tables.contains(schema + "." + tablePath.getObjectName())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ protected String getTableName(ObjectPath tablePath) {
+ return CrateDBTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgTableName();
+ }
+
+ @Override
+ protected String getSchemaName(ObjectPath tablePath) {
+ return CrateDBTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName();
+ }
+
+ @Override
+ protected String getSchemaTableName(ObjectPath tablePath) {
+ return CrateDBTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath();
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePath.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePath.java
new file mode 100644
index 00000000..db6a995f
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePath.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresTablePath;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Table path of CrateDB in Flink. Can be of formats "table_name" or "schema_name.table_name". When
+ * it's "table_name", the schema name defaults to "doc".
+ */
+public class CrateDBTablePath extends PostgresTablePath {
+
+ private static final String DEFAULT_CRATE_SCHEMA_NAME = "doc";
+
+ public CrateDBTablePath(String pgSchemaName, String pgTableName) {
+ super(pgSchemaName, pgTableName);
+ }
+
+ public static CrateDBTablePath fromFlinkTableName(String flinkTableName) {
+ if (flinkTableName.contains(".")) {
+ String[] path = flinkTableName.split("\\.");
+
+ checkArgument(
+ path.length == 2,
+ String.format(
+ "Table name '%s' is not valid. The parsed length is %d",
+ flinkTableName, path.length));
+
+ return new CrateDBTablePath(path[0], path[1]);
+ } else {
+ return new CrateDBTablePath(getDefaultSchemaName(), flinkTableName);
+ }
+ }
+
+ public static String toFlinkTableName(String schema, String table) {
+ return new PostgresTablePath(schema, table).getFullPath();
+ }
+
+ protected static String getDefaultSchemaName() {
+ return DEFAULT_CRATE_SCHEMA_NAME;
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTypeMapper.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTypeMapper.java
new file mode 100644
index 00000000..3532a1e5
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTypeMapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresTypeMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** CrateDBTypeMapper util class. */
+@Internal
+public class CrateDBTypeMapper extends PostgresTypeMapper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CrateDBTypeMapper.class);
+
+ // CrateDB jdbc driver uses very similar mapping
+ // to PostgreSQL driver, and adds some extras:
+ private static final String PG_STRING = "string";
+ private static final String PG_STRING_ARRAY = "_string";
+
+ @Override
+ protected DataType getMapping(String pgType, int precision, int scale) {
+ switch (pgType) {
+ case PG_SERIAL:
+ case PG_BIGSERIAL:
+ return null;
+ case PG_STRING:
+ return DataTypes.STRING();
+ case PG_STRING_ARRAY:
+ return DataTypes.ARRAY(DataTypes.STRING());
+ default:
+ return super.getMapping(pgType, precision, scale);
+ }
+ }
+
+ @Override
+ protected String getDBType() {
+ return "CrateDB";
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java
new file mode 100644
index 00000000..7592cf2e
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.AbstractPostgresCompatibleDialect;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Optional;
+
+/** JDBC dialect for CrateDB. */
+public class CrateDBDialect extends AbstractPostgresCompatibleDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String dialectName() {
+ return "CrateDB";
+ }
+
+ @Override
+ public CrateDBRowConverter getRowConverter(RowType rowType) {
+ return new CrateDBRowConverter(rowType);
+ }
+
+ @Override
+ public Optional defaultDriverName() {
+ return Optional.of("io.crate.client.jdbc.CrateDriver");
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java
new file mode 100644
index 00000000..ac5a8e7d
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link CrateDBDialect}. */
+@Internal
+public class CrateDBDialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:crate:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new CrateDBDialect();
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java
new file mode 100644
index 00000000..1412b0d5
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.dialect;
+
+import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+import io.crate.shade.org.postgresql.jdbc.PgArray;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * CrateDB.
+ */
+public class CrateDBRowConverter extends AbstractPostgresCompatibleRowConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ public CrateDBRowConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public String converterName() {
+ return "CrateDB";
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java
index 61f43ba8..7ece9b64 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java
@@ -70,7 +70,7 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
}
};
- private final JdbcDialectTypeMapper dialectTypeMapper;
+ protected final JdbcDialectTypeMapper dialectTypeMapper;
public PostgresCatalog(
ClassLoader userClassLoader,
@@ -79,8 +79,26 @@ public PostgresCatalog(
String username,
String pwd,
String baseUrl) {
+ this(
+ userClassLoader,
+ catalogName,
+ defaultDatabase,
+ username,
+ pwd,
+ baseUrl,
+ new PostgresTypeMapper());
+ }
+
+ protected PostgresCatalog(
+ ClassLoader userClassLoader,
+ String catalogName,
+ String defaultDatabase,
+ String username,
+ String pwd,
+ String baseUrl,
+ JdbcDialectTypeMapper dialectTypeMapper) {
super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
- this.dialectTypeMapper = new PostgresTypeMapper();
+ this.dialectTypeMapper = dialectTypeMapper;
}
// ------ databases ------
@@ -95,8 +113,35 @@ public List listDatabases() throws CatalogException {
dbName -> !builtinDatabases.contains(dbName));
}
+ // ------ schemas ------
+
+ protected Set getBuiltinSchemas() {
+ return builtinSchemas;
+ }
+
// ------ tables ------
+ protected List getPureTables(Connection conn, List schemas)
+ throws SQLException {
+ List tables = Lists.newArrayList();
+
+ // position 1 is database name, position 2 is schema name, position 3 is table name
+ try (PreparedStatement ps =
+ conn.prepareStatement(
+ "SELECT * FROM information_schema.tables "
+ + "WHERE table_type = 'BASE TABLE' "
+ + "AND table_schema = ? "
+ + "ORDER BY table_type, table_name;")) {
+ for (String schema : schemas) {
+ // Column index 1 is database name, 2 is schema name, 3 is table name
+ extractColumnValuesByStatement(ps, 3, null, schema).stream()
+ .map(pureTable -> schema + "." + pureTable)
+ .forEach(tables::add);
+ }
+ return tables;
+ }
+ }
+
@Override
public List listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
@@ -107,8 +152,6 @@ public List listTables(String databaseName)
throw new DatabaseNotExistException(getName(), databaseName);
}
- List tables = Lists.newArrayList();
-
final String url = baseUrl + databaseName;
try (Connection conn = DriverManager.getConnection(url, username, pwd)) {
// get all schemas
@@ -117,28 +160,15 @@ public List listTables(String databaseName)
conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;")) {
schemas =
extractColumnValuesByStatement(
- ps, 1, pgSchema -> !builtinSchemas.contains(pgSchema));
+ ps, 1, pgSchema -> !getBuiltinSchemas().contains(pgSchema));
}
// get all tables
- try (PreparedStatement ps =
- conn.prepareStatement(
- "SELECT * FROM information_schema.tables "
- + "WHERE table_type = 'BASE TABLE' "
- + "AND table_schema = ? "
- + "ORDER BY table_type, table_name;")) {
- for (String schema : schemas) {
- // Column index 1 is database name, 2 is schema name, 3 is table name
- extractColumnValuesByStatement(ps, 3, null, schema).stream()
- .map(pureTable -> schema + "." + pureTable)
- .forEach(tables::add);
- }
- }
+ return getPureTables(conn, schemas);
} catch (Exception e) {
throw new CatalogException(
String.format("Failed to list tables for database %s", databaseName), e);
}
- return tables;
}
/**
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePath.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePath.java
index 199a081a..d811e729 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePath.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePath.java
@@ -38,8 +38,12 @@ public class PostgresTablePath {
private final String pgTableName;
public PostgresTablePath(String pgSchemaName, String pgTableName) {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgSchemaName));
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgTableName));
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(pgSchemaName),
+ "Schema name is not valid. Null or empty is not allowed");
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(pgTableName),
+ "Table name is not valid. Null or empty is not allowed");
this.pgSchemaName = pgSchemaName;
this.pgTableName = pgTableName;
@@ -57,7 +61,7 @@ public static PostgresTablePath fromFlinkTableName(String flinkTableName) {
return new PostgresTablePath(path[0], path[1]);
} else {
- return new PostgresTablePath(DEFAULT_POSTGRES_SCHEMA_NAME, flinkTableName);
+ return new PostgresTablePath(getDefaultSchemaName(), flinkTableName);
}
}
@@ -77,6 +81,10 @@ public String getPgSchemaName() {
return pgSchemaName;
}
+ protected static String getDefaultSchemaName() {
+ return DEFAULT_POSTGRES_SCHEMA_NAME;
+ }
+
@Override
public String toString() {
return getFullPath();
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTypeMapper.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTypeMapper.java
index b2769d75..f213128a 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTypeMapper.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTypeMapper.java
@@ -50,8 +50,8 @@ public class PostgresTypeMapper implements JdbcDialectTypeMapper {
// boolean <=> bool
// decimal <=> numeric
private static final String PG_SMALLSERIAL = "smallserial";
- private static final String PG_SERIAL = "serial";
- private static final String PG_BIGSERIAL = "bigserial";
+ protected static final String PG_SERIAL = "serial";
+ protected static final String PG_BIGSERIAL = "bigserial";
private static final String PG_BYTEA = "bytea";
private static final String PG_BYTEA_ARRAY = "_bytea";
private static final String PG_SMALLINT = "int2";
@@ -93,6 +93,15 @@ public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int co
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
+ DataType dataType = getMapping(pgType, precision, scale);
+ if (dataType == null) {
+ throw new UnsupportedOperationException(
+ String.format("Doesn't support %s type '%s' yet", getDBType(), pgType));
+ }
+ return dataType;
+ }
+
+ protected DataType getMapping(String pgType, int precision, int scale) {
switch (pgType) {
case PG_BOOLEAN:
return DataTypes.BOOLEAN();
@@ -128,14 +137,13 @@ public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int co
case PG_NUMERIC:
// see SPARK-26538: handle numeric without explicit precision and scale.
if (precision > 0) {
- return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
+ return DataTypes.DECIMAL(precision, scale);
}
return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);
case PG_NUMERIC_ARRAY:
// see SPARK-26538: handle numeric without explicit precision and scale.
if (precision > 0) {
- return DataTypes.ARRAY(
- DataTypes.DECIMAL(precision, metadata.getScale(colIndex)));
+ return DataTypes.ARRAY(DataTypes.DECIMAL(precision, scale));
}
return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18));
case PG_CHAR:
@@ -169,8 +177,11 @@ public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int co
case PG_DATE_ARRAY:
return DataTypes.ARRAY(DataTypes.DATE());
default:
- throw new UnsupportedOperationException(
- String.format("Doesn't support Postgres type '%s' yet", pgType));
+ return null;
}
}
+
+ protected String getDBType() {
+ return "Postgres";
+ }
}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
index f5b4af24..d0924aee 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
@@ -19,114 +19,29 @@
package org.apache.flink.connector.jdbc.databases.postgres.dialect;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
-import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.connector.jdbc.dialect.AbstractPostgresCompatibleDialect;
import org.apache.flink.table.types.logical.RowType;
-import java.util.Arrays;
-import java.util.EnumSet;
import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-/** JDBC dialect for PostgresSQL. */
+/** JDBC dialect for PostgreSQL. */
@Internal
-public class PostgresDialect extends AbstractDialect {
+public class PostgresDialect extends AbstractPostgresCompatibleDialect {
private static final long serialVersionUID = 1L;
- // Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs:
- // https://www.postgresql.org/docs/12/datatype-datetime.html
- private static final int MAX_TIMESTAMP_PRECISION = 6;
- private static final int MIN_TIMESTAMP_PRECISION = 1;
-
- // Define MAX/MIN precision of DECIMAL type according to PostgreSQL docs:
- // https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-NUMERIC-DECIMAL
- private static final int MAX_DECIMAL_PRECISION = 1000;
- private static final int MIN_DECIMAL_PRECISION = 1;
-
@Override
- public JdbcRowConverter getRowConverter(RowType rowType) {
+ public PostgresRowConverter getRowConverter(RowType rowType) {
return new PostgresRowConverter(rowType);
}
- @Override
- public String getLimitClause(long limit) {
- return "LIMIT " + limit;
- }
-
@Override
public Optional defaultDriverName() {
return Optional.of("org.postgresql.Driver");
}
- /** Postgres upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into Postgres. */
- @Override
- public Optional getUpsertStatement(
- String tableName, String[] fieldNames, String[] uniqueKeyFields) {
- String uniqueColumns =
- Arrays.stream(uniqueKeyFields)
- .map(this::quoteIdentifier)
- .collect(Collectors.joining(", "));
- String updateClause =
- Arrays.stream(fieldNames)
- .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
- .collect(Collectors.joining(", "));
- return Optional.of(
- getInsertIntoStatement(tableName, fieldNames)
- + " ON CONFLICT ("
- + uniqueColumns
- + ")"
- + " DO UPDATE SET "
- + updateClause);
- }
-
- @Override
- public String quoteIdentifier(String identifier) {
- return identifier;
- }
-
@Override
public String dialectName() {
return "PostgreSQL";
}
-
- @Override
- public Optional decimalPrecisionRange() {
- return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
- }
-
- @Override
- public Optional timestampPrecisionRange() {
- return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
- }
-
- @Override
- public Set supportedTypes() {
- // The data types used in PostgreSQL are list at:
- // https://www.postgresql.org/docs/12/datatype.html
-
- // TODO: We can't convert BINARY data type to
- // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
- // LegacyTypeInfoDataTypeConverter.
-
- return EnumSet.of(
- LogicalTypeRoot.CHAR,
- LogicalTypeRoot.VARCHAR,
- LogicalTypeRoot.BOOLEAN,
- LogicalTypeRoot.VARBINARY,
- LogicalTypeRoot.DECIMAL,
- LogicalTypeRoot.TINYINT,
- LogicalTypeRoot.SMALLINT,
- LogicalTypeRoot.INTEGER,
- LogicalTypeRoot.BIGINT,
- LogicalTypeRoot.FLOAT,
- LogicalTypeRoot.DOUBLE,
- LogicalTypeRoot.DATE,
- LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
- LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
- LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
- LogicalTypeRoot.ARRAY);
- }
}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java
index 4f302cc1..2e6e14c2 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java
@@ -18,84 +18,25 @@
package org.apache.flink.connector.jdbc.databases.postgres.dialect;
-import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
-import org.apache.flink.table.data.GenericArrayData;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.postgresql.jdbc.PgArray;
-import java.lang.reflect.Array;
-
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* PostgreSQL.
*/
-public class PostgresRowConverter extends AbstractJdbcRowConverter {
+public class PostgresRowConverter extends AbstractPostgresCompatibleRowConverter {
private static final long serialVersionUID = 1L;
- @Override
- public String converterName() {
- return "PostgreSQL";
- }
-
public PostgresRowConverter(RowType rowType) {
super(rowType);
}
@Override
- public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
- LogicalTypeRoot root = type.getTypeRoot();
-
- if (root == LogicalTypeRoot.ARRAY) {
- ArrayType arrayType = (ArrayType) type;
- return createPostgresArrayConverter(arrayType);
- } else {
- return createPrimitiveConverter(type);
- }
- }
-
- @Override
- protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) {
- LogicalTypeRoot root = type.getTypeRoot();
- if (root == LogicalTypeRoot.ARRAY) {
- // note:Writing ARRAY type is not yet supported by PostgreSQL dialect now.
- return (val, index, statement) -> {
- throw new IllegalStateException(
- String.format(
- "Writing ARRAY type is not yet supported in JDBC:%s.",
- converterName()));
- };
- } else {
- return super.createNullableExternalConverter(type);
- }
- }
-
- private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arrayType) {
- // Since PGJDBC 42.2.15 (https://github.com/pgjdbc/pgjdbc/pull/1194) bytea[] is wrapped in
- // primitive byte arrays
- final Class> elementClass =
- LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
- final JdbcDeserializationConverter elementConverter =
- createNullableInternalConverter(arrayType.getElementType());
- return val -> {
- PgArray pgArray = (PgArray) val;
- Object[] in = (Object[]) pgArray.getArray();
- final Object[] array = (Object[]) Array.newInstance(elementClass, in.length);
- for (int i = 0; i < in.length; i++) {
- array[i] = elementConverter.deserialize(in[i]);
- }
- return new GenericArrayData(array);
- };
- }
-
- // Have its own method so that Postgres can support primitives that super class doesn't support
- // in the future
- private JdbcDeserializationConverter createPrimitiveConverter(LogicalType type) {
- return super.createInternalConverter(type);
+ public String converterName() {
+ return "PostgreSQL";
}
}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java
new file mode 100644
index 00000000..0ca425f3
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect;
+
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for PostgreSQL compatible databases. */
+public abstract class AbstractPostgresCompatibleDialect extends AbstractDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ // Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs:
+ // https://www.postgresql.org/docs/12/datatype-datetime.html
+ private static final int MAX_TIMESTAMP_PRECISION = 6;
+ private static final int MIN_TIMESTAMP_PRECISION = 1;
+
+ // Define MAX/MIN precision of DECIMAL type according to PostgreSQL docs:
+ // https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-NUMERIC-DECIMAL
+ private static final int MAX_DECIMAL_PRECISION = 1000;
+ private static final int MIN_DECIMAL_PRECISION = 1;
+
+ @Override
+ public String getLimitClause(long limit) {
+ return "LIMIT " + limit;
+ }
+
+ /** Postgres upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into Postgres. */
+ @Override
+ public Optional getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ String uniqueColumns =
+ Arrays.stream(uniqueKeyFields)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String updateClause =
+ Arrays.stream(fieldNames)
+ .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+ return Optional.of(
+ getInsertIntoStatement(tableName, fieldNames)
+ + " ON CONFLICT ("
+ + uniqueColumns
+ + ")"
+ + " DO UPDATE SET "
+ + updateClause);
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return identifier;
+ }
+
+ @Override
+ public Optional decimalPrecisionRange() {
+ return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
+ }
+
+ @Override
+ public Optional timestampPrecisionRange() {
+ return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+ }
+
+ @Override
+ public Set supportedTypes() {
+ // The data types used in PostgreSQL are list at:
+ // https://www.postgresql.org/docs/12/datatype.html
+
+ // TODO: We can't convert BINARY data type to
+ // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
+ // LegacyTypeInfoDataTypeConverter.
+
+ return EnumSet.of(
+ LogicalTypeRoot.CHAR,
+ LogicalTypeRoot.VARCHAR,
+ LogicalTypeRoot.BOOLEAN,
+ LogicalTypeRoot.VARBINARY,
+ LogicalTypeRoot.DECIMAL,
+ LogicalTypeRoot.TINYINT,
+ LogicalTypeRoot.SMALLINT,
+ LogicalTypeRoot.INTEGER,
+ LogicalTypeRoot.BIGINT,
+ LogicalTypeRoot.FLOAT,
+ LogicalTypeRoot.DOUBLE,
+ LogicalTypeRoot.DATE,
+ LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+ LogicalTypeRoot.ARRAY);
+ }
+}
diff --git a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
index c0f0c966..5e381098 100644
--- a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
+++ b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
@@ -19,3 +19,5 @@ org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialectFactor
org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialectFactory
org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialectFactory
org.apache.flink.connector.jdbc.databases.clickhouse.dialect.ClickHouseDialectFactory
+org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialectFactory
+
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogITCase.java
new file mode 100644
index 00000000..11bf61d0
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogITCase.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.apache.flink.connector.jdbc.databases.cratedb.catalog.CrateDBCatalog.DEFAULT_DATABASE;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** E2E test for {@link CrateDBCatalog}. */
+class CrateDBCatalogITCase extends CrateDBCatalogTestBase {
+
+ private TableEnvironment tEnv;
+
+ @BeforeEach
+ void setup() {
+ this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+ tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+ // use CrateDB catalog
+ tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
+ tEnv.useCatalog(TEST_CATALOG_NAME);
+ }
+
+ @Test
+ void testSelectField() {
+ List results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select id from %s", TABLE1))
+ .execute()
+ .collect());
+ assertThat(results).hasToString("[+I[1]]");
+ }
+
+ @Test
+ void testWithoutSchema() {
+ List results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select * from %s", TABLE1))
+ .execute()
+ .collect());
+ assertThat(results).hasToString("[+I[1]]");
+ }
+
+ @Test
+ void testWithSchema() {
+ List results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(
+ String.format(
+ "select * from `%s`",
+ CrateDBTablePath.fromFlinkTableName(TABLE1)))
+ .execute()
+ .collect());
+ assertThat(results).hasToString("[+I[1]]");
+ }
+
+ @Test
+ void testFullPath() {
+ List results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(
+ String.format(
+ "select * from %s.%s.`%s`",
+ TEST_CATALOG_NAME,
+ DEFAULT_DATABASE,
+ CrateDBTablePath.fromFlinkTableName(TABLE1)))
+ .execute()
+ .collect());
+ assertThat(results).hasToString("[+I[1]]");
+ }
+
+ @Test
+ void testGroupByInsert() throws Exception {
+ tEnv.executeSql(
+ String.format(
+ "insert into `%s`"
+ + "select `int`, `short`, max(`long`), max(`real`), "
+ + "max(`double`), max(`boolean`), "
+ + "max(`text`), max(`timestamp`) "
+ + "from `%s` group by `int`, `short`",
+ TABLE_TARGET_PRIMITIVE, TABLE_PRIMITIVE_TYPE))
+ .await();
+ executeSQL(String.format("REFRESH TABLE doc.%s", TABLE_TARGET_PRIMITIVE));
+ List results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select * from `%s`", TABLE_TARGET_PRIMITIVE))
+ .execute()
+ .collect());
+ assertThat(results)
+ .hasToString("[+I[1, 3, 4, 5.5, 6.6, true, b, 2016-06-22T19:10:25.123]]");
+ }
+
+ @Test
+ void testPrimitiveTypes() {
+ List results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE))
+ .execute()
+ .collect());
+
+ assertThat(results)
+ .hasToString(
+ "[+I[1, 3, 3, 4, 4, 5.5, 5.5, 6.6, 6.6, true, a, b, c, d , e, 192.168.0.100, 2016-06-22T19:10:25.123]]");
+ }
+
+ @Test
+ void testArrayTypes() {
+ List results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select * from %s", TABLE_ARRAY_TYPE))
+ .execute()
+ .collect());
+
+ assertThat(results)
+ .hasToString(
+ "[+I[[1, 2, 3], [3, 4, 5], [3, 4, 5], [4, 5, 6], [4, 5, 6], [5.5, 6.6, 7.7], [5.5, 6.6, 7.7], [6.6, 7.7, 8.8], [6.6, 7.7, 8.8], [true, false, true], [a, b, c], [a, b, c], [b, c, d], [b , c , d ], [b, c, d], [0:0:0:0:0:ffff:c0a8:64, 10.2.5.28, 127.0.0.6], [2016-06-22T19:10:25.123, 2019-06-22T11:22:33.987], null]]");
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTest.java
new file mode 100644
index 00000000..fbf5916a
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link CrateDBCatalog}. */
+class CrateDBCatalogTest extends CrateDBCatalogTestBase {
+
+ // ------ databases ------
+
+ @Test
+ void testGetDb_DatabaseNotExistException() {
+ assertThatThrownBy(() -> catalog.getDatabase("nonexistent"))
+ .isInstanceOf(DatabaseNotExistException.class)
+ .hasMessageContaining("Database nonexistent does not exist in Catalog");
+ }
+
+ @Test
+ void testListDatabases() {
+ Assertions.assertThat(catalog.listDatabases()).containsExactly("crate");
+ }
+
+ @Test
+ void testDbExists() {
+ Assertions.assertThat(catalog.databaseExists("nonexistent")).isFalse();
+ Assertions.assertThat(catalog.databaseExists(CrateDBCatalog.DEFAULT_DATABASE)).isTrue();
+ }
+
+ // ------ tables ------
+
+ @Test
+ void testListTables() throws DatabaseNotExistException {
+ List actual = catalog.listTables(CrateDBCatalog.DEFAULT_DATABASE);
+
+ assertThat(actual)
+ .containsExactly(
+ "doc.array_table",
+ "doc.primitive_table",
+ "doc.t1",
+ "doc.t2",
+ "doc.target_primitive_table",
+ "test_schema.t3");
+ }
+
+ @Test
+ void testListTables_DatabaseNotExistException() {
+ assertThatThrownBy(() -> catalog.listTables("CrateDB"))
+ .isInstanceOf(DatabaseNotExistException.class);
+ }
+
+ @Test
+ void testTableExists() {
+ Assertions.assertThat(
+ catalog.tableExists(
+ new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, "nonexist")))
+ .isFalse();
+
+ Assertions.assertThat(
+ catalog.tableExists(
+ new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TABLE1)))
+ .isTrue();
+ Assertions.assertThat(
+ catalog.tableExists(
+ new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, "test_schema.t3")))
+ .isTrue();
+ }
+
+ @Test
+ void testGetTables_TableNotExistException() {
+ assertThatThrownBy(
+ () ->
+ catalog.getTable(
+ new ObjectPath(
+ CrateDBCatalog.DEFAULT_DATABASE,
+ CrateDBTablePath.toFlinkTableName(
+ TEST_SCHEMA, "anytable"))))
+ .isInstanceOf(TableNotExistException.class);
+ }
+
+ @Test
+ void testGetTables_TableNotExistException_NoSchema() {
+ assertThatThrownBy(
+ () ->
+ catalog.getTable(
+ new ObjectPath(
+ CrateDBCatalog.DEFAULT_DATABASE,
+ CrateDBTablePath.toFlinkTableName(
+ "nonexistschema", "anytable"))))
+ .isInstanceOf(TableNotExistException.class);
+ }
+
+ @Test
+ void testGetTables_TableNotExistException_NoDb() {
+ assertThatThrownBy(
+ () ->
+ catalog.getTable(
+ new ObjectPath(
+ "nonexistdb",
+ CrateDBTablePath.toFlinkTableName(
+ TEST_SCHEMA, "anytable"))))
+ .isInstanceOf(TableNotExistException.class);
+ }
+
+ @Test
+ void testGetTable() throws TableNotExistException {
+ // test crate.doc.t1
+ Schema schema = getSimpleTable().schema;
+ CatalogBaseTable table =
+ catalog.getTable(new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TABLE1));
+
+ assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+ table = catalog.getTable(new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, "doc.t1"));
+
+ assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+ table = catalog.getTable(new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TABLE2));
+
+ assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+ table = catalog.getTable(new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, "doc.t2"));
+
+ assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+ // test crate.test_schema.t2
+ table =
+ catalog.getTable(
+ new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TEST_SCHEMA + ".t3"));
+
+ assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+ }
+
+ @Test
+ void testPrimitiveDataTypes() throws TableNotExistException {
+ CatalogBaseTable table =
+ catalog.getTable(
+ new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TABLE_PRIMITIVE_TYPE));
+
+ assertThat(table.getUnresolvedSchema())
+ .hasToString(sanitizeSchemaSQL(getPrimitiveTable().schema.toString()));
+ }
+
+ @Test
+ void testArrayDataTypes() throws TableNotExistException {
+ CatalogBaseTable table =
+ catalog.getTable(new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE));
+
+ assertThat(table.getUnresolvedSchema())
+ .hasToString(sanitizeSchemaSQL(getArrayTable().schema.toString()));
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTestBase.java
new file mode 100644
index 00000000..9a9fdd09
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTestBase.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.TimeZone;
+
+import static org.apache.flink.connector.jdbc.testutils.databases.cratedb.CrateDBDatabase.CONTAINER;
+
+/** Test base for {@link CrateDBCatalog}. */
+class CrateDBCatalogTestBase implements JdbcITCaseBase, CrateDBTestBase {
+
+ public static final Logger LOG = LoggerFactory.getLogger(CrateDBCatalogTestBase.class);
+
+ protected static final String TEST_CATALOG_NAME = "mycratedb";
+ protected static final String TEST_USERNAME = CONTAINER.getUsername();
+ protected static final String TEST_PWD = CONTAINER.getPassword();
+ protected static final String TEST_SCHEMA = "test_schema";
+ protected static final String TABLE1 = "t1";
+ protected static final String TABLE2 = "t2";
+ protected static final String TABLE3 = "t3";
+ protected static final String TABLE_PRIMITIVE_TYPE = "primitive_table";
+ protected static final String TABLE_TARGET_PRIMITIVE = "target_primitive_table";
+ protected static final String TABLE_ARRAY_TYPE = "array_table";
+
+ protected static String baseUrl;
+ protected static CrateDBCatalog catalog;
+
+ @BeforeAll
+ static void init() throws SQLException {
+ // For deterministic timestamp field results
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+
+ // jdbc:crate://localhost:50807/crate?user=crate
+ String jdbcUrl = CONTAINER.getJdbcUrl();
+ // jdbc:crate://localhost:50807/
+ baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+ catalog =
+ new CrateDBCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ CrateDBCatalog.DEFAULT_DATABASE,
+ TEST_USERNAME,
+ TEST_PWD,
+ baseUrl);
+
+ // create test tables
+ // table: crate.doc.t1
+ // table: crate.doc.t2
+ createTable(CrateDBTablePath.fromFlinkTableName(TABLE1), getSimpleTable().crateDBSchemaSql);
+ createTable(CrateDBTablePath.fromFlinkTableName(TABLE2), getSimpleTable().crateDBSchemaSql);
+
+ // table: crate.test_schema.t3
+ createTable(new CrateDBTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().crateDBSchemaSql);
+ createTable(
+ CrateDBTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE),
+ getPrimitiveTable().crateDBSchemaSql);
+ createTable(
+ CrateDBTablePath.fromFlinkTableName(TABLE_TARGET_PRIMITIVE),
+ getTargetPrimitiveTable().crateDBSchemaSql);
+ createTable(
+ CrateDBTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE),
+ getArrayTable().crateDBSchemaSql);
+
+ executeSQL(
+ String.format("insert into doc.%s values (%s);", TABLE1, getSimpleTable().values));
+ executeSQL(
+ String.format(
+ "insert into doc.%s values (%s);",
+ TABLE_PRIMITIVE_TYPE, getPrimitiveTable().values));
+ executeSQL(
+ String.format(
+ "insert into doc.%s values (%s);",
+ TABLE_ARRAY_TYPE, getArrayTable().values));
+ }
+
+ public static void createTable(CrateDBTablePath tablePath, String tableSchemaSql)
+ throws SQLException {
+ executeSQL(String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
+ executeSQL(String.format("REFRESH TABLE %s", tablePath.getFullPath()));
+ }
+
+ public static void executeSQL(String sql) throws SQLException {
+ try (Connection conn =
+ DriverManager.getConnection(
+ String.format("%s/%s", baseUrl, CrateDBCatalog.DEFAULT_DATABASE),
+ TEST_USERNAME,
+ TEST_PWD);
+ Statement statement = conn.createStatement()) {
+ statement.executeUpdate(sql);
+ }
+ }
+
+ /** Object holding schema and corresponding sql. */
+ public static class TestTable {
+ Schema schema;
+ String crateDBSchemaSql;
+ String values;
+
+ public TestTable(Schema schema, String crateDBSchemaSql, String values) {
+ this.schema = schema;
+ this.crateDBSchemaSql = crateDBSchemaSql;
+ this.values = values;
+ }
+ }
+
+ public static TestTable getSimpleTable() {
+ return new TestTable(
+ Schema.newBuilder().column("id", DataTypes.INT()).build(), "id integer", "1");
+ }
+
+ // TODO: add back timestamptz and time types.
+ // Flink currently doesn't support converting time's precision, with the following error
+ // TableException: Unsupported conversion from data type 'TIME(6)' (conversion class:
+ // java.sql.Time)
+ // to type information. Only data types that originated from type information fully support a
+ // reverse conversion.
+ public static TestTable getPrimitiveTable() {
+ return new TestTable(
+ Schema.newBuilder()
+ .column("int", DataTypes.INT().notNull())
+ .column("short", DataTypes.SMALLINT().notNull())
+ .column("smallint", DataTypes.SMALLINT())
+ .column("long", DataTypes.BIGINT())
+ .column("bigint", DataTypes.BIGINT())
+ .column("float", DataTypes.FLOAT())
+ .column("real", DataTypes.FLOAT())
+ .column("double", DataTypes.DOUBLE())
+ .column("double_precision", DataTypes.DOUBLE())
+ .column("boolean", DataTypes.BOOLEAN())
+ .column("string", DataTypes.STRING())
+ .column("text", DataTypes.STRING())
+ .column("char", DataTypes.CHAR(1))
+ .column("character", DataTypes.CHAR(3))
+ .column("character_varying", DataTypes.VARCHAR(20))
+ .column("ip", DataTypes.STRING())
+ .column("timestamp", DataTypes.TIMESTAMP(6))
+ // .column("timestamptz", DataTypes.TIMESTAMP_WITH_TIME_ZONE(6))
+ .primaryKeyNamed("primitive_table_pk", "short", "int")
+ .build(),
+ "int integer, "
+ + "short short, "
+ + "smallint smallint, "
+ + "long long, "
+ + "bigint bigint, "
+ + "float float, "
+ + "real real, "
+ + "double double, "
+ + "double_precision double precision, "
+ + "boolean boolean, "
+ + "string string, "
+ + "text text, "
+ + "char char, "
+ + "character character(3), "
+ + "character_varying character varying(20), "
+ + "ip ip, "
+ + "timestamp timestamp, "
+ // + "timestamptz timestamptz, "
+ + "PRIMARY KEY (short, int)",
+ // Values
+ "1,"
+ + "3,"
+ + "3,"
+ + "4,"
+ + "4,"
+ + "5.5,"
+ + "5.5,"
+ + "6.6,"
+ + "6.6,"
+ + "true,"
+ + "'a',"
+ + "'b',"
+ + "'c',"
+ + "'d',"
+ + "'e',"
+ + "'0:0:0:0:0:ffff:c0a8:64',"
+ + "'2016-06-22 19:10:25.123456'");
+ // + "'2006-06-22 19:10:25.123456'");
+ }
+
+ // TODO: add back timestamptz once planner supports timestamp with timezone
+ public static TestTable getArrayTable() {
+ return new TestTable(
+ Schema.newBuilder()
+ .column("int_arr", DataTypes.ARRAY(DataTypes.INT()))
+ .column("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT()))
+ .column("smallint_arr", DataTypes.ARRAY(DataTypes.SMALLINT()))
+ .column("long_arr", DataTypes.ARRAY(DataTypes.BIGINT()))
+ .column("bigint_arr", DataTypes.ARRAY(DataTypes.BIGINT()))
+ .column("float_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
+ .column("real_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
+ .column("double_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
+ .column("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
+ .column("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
+ .column("string_arr", DataTypes.ARRAY(DataTypes.STRING()))
+ .column("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
+ .column("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
+ .column("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3)))
+ .column("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20)))
+ .column("ip", DataTypes.ARRAY(DataTypes.STRING()))
+ .column("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(6)))
+ // .column("timestamptz_arr",
+ // DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)))
+ .column("null_text_arr", DataTypes.ARRAY(DataTypes.STRING()))
+ .build(),
+ "int_arr integer[], "
+ + "short_arr short[],"
+ + "smallint_arr smallint[],"
+ + "long_arr long[], "
+ + "bigint_arr bigint[], "
+ + "float_arr float[], "
+ + "real_arr real[], "
+ + "double_arr double[], "
+ + "double_precision_arr double precision[], "
+ + "boolean_arr boolean[], "
+ + "string_arr string[], "
+ + "text_arr text[], "
+ + "char_arr char[], "
+ + "character_arr character(3)[], "
+ + "character_varying_arr character varying(20)[],"
+ + "ip string[],"
+ + "timestamp_arr timestamp[], "
+ // + "timestamptz_arr timestamptz[], "
+ + "null_text_arr text[]",
+ // Values
+ "[1,2,3],"
+ + "[3,4,5],"
+ + "[3,4,5],"
+ + "[4,5,6],"
+ + "[4,5,6],"
+ + "[5.5,6.6,7.7],"
+ + "[5.5,6.6,7.7],"
+ + "[6.6,7.7,8.8],"
+ + "[6.6,7.7,8.8],"
+ + "[true,false,true],"
+ + "['a','b','c'],"
+ + "['a','b','c'],"
+ + "['b','c','d'],"
+ + "['b','c','d'],"
+ + "['b','c','d'],"
+ + "['0:0:0:0:0:ffff:c0a8:64', '10.2.5.28', '127.0.0.6'],"
+ + "['2016-06-22 19:10:25.123456', '2019-06-22 11:22:33.987654'],"
+ // + "['2006-06-22 19:10:25.123456', '2019-06-22 11:22:33.987654'],"
+ + "NULL");
+ }
+
+ public static TestTable getTargetPrimitiveTable() {
+ return new TestTable(
+ Schema.newBuilder()
+ .column("int", DataTypes.INT().notNull())
+ .column("short", DataTypes.SMALLINT().notNull())
+ .column("long", DataTypes.BIGINT())
+ .column("real", DataTypes.FLOAT())
+ .column("double", DataTypes.DOUBLE())
+ .column("boolean", DataTypes.BOOLEAN())
+ .column("text", DataTypes.STRING())
+ .column("timestamp", DataTypes.TIMESTAMP(6))
+ .build(),
+ "int integer, "
+ + "short short, "
+ + "long long, "
+ + "real real, "
+ + "double double, "
+ + "boolean boolean, "
+ + "text text, "
+ + "timestamp timestamp",
+ // Values
+ null);
+ }
+
+ protected static String sanitizeSchemaSQL(String schemaSQL) {
+ return schemaSQL
+ .replaceAll("CHAR\\(\\d+\\)", "CHAR(2147483647)")
+ .replaceAll("VARCHAR\\(\\d+\\)", "STRING");
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePathTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePathTest.java
new file mode 100644
index 00000000..76af0858
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePathTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link CrateDBTablePath}. */
+class CrateDBTablePathTest {
+
+ @Test
+ void testToFlinkTableName() {
+ assertThat(CrateDBTablePath.toFlinkTableName("my_schema", "my_table"))
+ .isEqualTo("my_schema.my_table");
+ assertThat(CrateDBTablePath.toFlinkTableName("crate.my_schema", "my_table"))
+ .isEqualTo("crate.my_schema.my_table");
+ assertThatThrownBy(() -> CrateDBTablePath.toFlinkTableName("", "my_table"))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Schema name is not valid. Null or empty is not allowed");
+ }
+
+ @Test
+ void testFromFlinkTableName() {
+ assertThat(CrateDBTablePath.fromFlinkTableName("my_schema.my_table"))
+ .isEqualTo(new CrateDBTablePath("my_schema", "my_table"));
+ assertThat(CrateDBTablePath.fromFlinkTableName("my_table"))
+ .isEqualTo(new CrateDBTablePath("doc", "my_table"));
+ assertThatThrownBy(() -> CrateDBTablePath.fromFlinkTableName("crate.doc.my_table"))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Table name 'crate.doc.my_table' is not valid. The parsed length is 3");
+ assertThatThrownBy(() -> CrateDBTablePath.fromFlinkTableName(""))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Table name is not valid. Null or empty is not allowed");
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTestBase.java
new file mode 100644
index 00000000..96f2c355
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTestBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
+import org.apache.flink.connector.jdbc.testutils.databases.cratedb.CrateDBDatabase;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Base class for CrateDB testing. */
+@ExtendWith(CrateDBDatabase.class)
+public interface CrateDBTestBase extends DatabaseTest {
+
+ @Override
+ default DatabaseMetadata getMetadata() {
+ return CrateDBDatabase.getMetadata();
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePathTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePathTest.java
index 67988349..38625be2 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePathTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePathTest.java
@@ -21,12 +21,33 @@
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link PostgresTablePath}. */
class PostgresTablePathTest {
+ @Test
+ void testToFlinkTableName() {
+ assertThat(PostgresTablePath.toFlinkTableName("my_schema", "my_table"))
+ .isEqualTo("my_schema.my_table");
+ assertThat(PostgresTablePath.toFlinkTableName("postgres.my_schema", "my_table"))
+ .isEqualTo("postgres.my_schema.my_table");
+ assertThatThrownBy(() -> PostgresTablePath.toFlinkTableName("", "my_table"))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Schema name is not valid. Null or empty is not allowed");
+ }
+
@Test
void testFromFlinkTableName() {
- assertThat(PostgresTablePath.fromFlinkTableName("public.topic"))
- .isEqualTo(new PostgresTablePath("public", "topic"));
+ assertThat(PostgresTablePath.fromFlinkTableName("my_schema.my_table"))
+ .isEqualTo(new PostgresTablePath("my_schema", "my_table"));
+ assertThat(PostgresTablePath.fromFlinkTableName("my_table"))
+ .isEqualTo(new PostgresTablePath("public", "my_table"));
+ assertThatThrownBy(() -> PostgresTablePath.fromFlinkTableName("postgres.public.my_table"))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Table name 'postgres.public.my_table' is not valid. The parsed length is 3");
+ assertThatThrownBy(() -> PostgresTablePath.fromFlinkTableName(""))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Table name is not valid. Null or empty is not allowed");
}
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/cratedb/CrateDBDialectTypeTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/cratedb/CrateDBDialectTypeTest.java
new file mode 100644
index 00000000..563adb2d
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/cratedb/CrateDBDialectTypeTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.cratedb;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** The CrateDB params for {@link JdbcDialectTypeTest}. */
+public class CrateDBDialectTypeTest extends JdbcDialectTypeTest {
+
+ @Override
+ protected String testDialect() {
+ return "crate";
+ }
+
+ @Override
+ protected List testData() {
+ return Arrays.asList(
+ createTestItem("CHAR"),
+ createTestItem("VARCHAR"),
+ createTestItem("BOOLEAN"),
+ createTestItem("TINYINT"),
+ createTestItem("SMALLINT"),
+ createTestItem("INTEGER"),
+ createTestItem("BIGINT"),
+ createTestItem("FLOAT"),
+ createTestItem("DOUBLE"),
+ createTestItem("DECIMAL(10, 4)"),
+ createTestItem("DECIMAL(38, 18)"),
+ createTestItem("DATE"),
+ createTestItem("TIME"),
+ createTestItem("TIMESTAMP(3)"),
+ createTestItem("TIMESTAMP WITHOUT TIME ZONE"),
+ createTestItem("ARRAY"),
+
+ // Not valid data
+ createTestItem("BINARY", "The CrateDB dialect doesn't support type: BINARY(1)."),
+ createTestItem(
+ "TIMESTAMP(9) WITHOUT TIME ZONE",
+ "The precision of field 'f0' is out of the TIMESTAMP precision range [1, 6] supported by CrateDB dialect."),
+ createTestItem("TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)"));
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBDatabase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBDatabase.java
new file mode 100644
index 00000000..eae2148c
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBDatabase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.cratedb;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+
+import static java.time.temporal.ChronoUnit.SECONDS;
+
+/** A CrateDB database for testing. */
+public class CrateDBDatabase extends DatabaseExtension {
+
+ private static final String CRATEDB = "crate:5.3.1";
+ private static final int CRATEDB_PG_PORT = 5432;
+ private static final int CRATEDB_HTTP_PORT = 4200;
+
+ private static final DockerImageName CRATEDB_DOCKER_IMAGE =
+ DockerImageName.parse(CRATEDB).asCompatibleSubstituteFor("postgres");
+ private static final WaitStrategy WAIT_STRATEGY =
+ Wait.forHttp("/")
+ .forPort(CRATEDB_HTTP_PORT)
+ .forStatusCode(200)
+ .withStartupTimeout(Duration.of(60, SECONDS));
+
+ private static CrateDBMetadata metadata;
+
+ public static final CrateDBContainer CONTAINER =
+ new CrateDBContainer(CRATEDB_DOCKER_IMAGE)
+ .withDatabaseName("crate")
+ .withUsername("crate")
+ .withPassword("crate")
+ .withCommand("crate")
+ .withEnv("TZ", "UTC") // For deterministic timestamp field results
+ .waitingFor(WAIT_STRATEGY);
+
+ public static CrateDBMetadata getMetadata() {
+ if (!CONTAINER.isRunning()) {
+ throw new FlinkRuntimeException("Container is stopped.");
+ }
+ if (metadata == null) {
+ metadata = new CrateDBMetadata(CONTAINER);
+ }
+ return metadata;
+ }
+
+ @Override
+ protected DatabaseMetadata startDatabase() throws Exception {
+ CONTAINER.start();
+ return getMetadata();
+ }
+
+ @Override
+ protected void stopDatabase() throws Exception {
+ CONTAINER.stop();
+ metadata = null;
+ }
+
+ /**
+ * Workaround to use testcontainers with legacy CrateDB JDBC driver.
+ */
+ public static class CrateDBContainer extends JdbcDatabaseContainer {
+
+ public static final String IMAGE = "crate";
+
+ private String databaseName = "crate";
+
+ private String username = "crate";
+
+ private String password = "crate";
+
+ public CrateDBContainer(final DockerImageName dockerImageName) {
+ super(dockerImageName);
+ dockerImageName.assertCompatibleWith(DockerImageName.parse(IMAGE));
+
+ this.waitStrategy = Wait.forHttp("/").forPort(CRATEDB_HTTP_PORT).forStatusCode(200);
+
+ addExposedPort(CRATEDB_PG_PORT);
+ addExposedPort(CRATEDB_HTTP_PORT);
+ }
+
+ @Override
+ public String getDriverClassName() {
+ return "io.crate.client.jdbc.CrateDriver";
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ String additionalUrlParams = constructUrlParameters("?", "&");
+ return ("jdbc:crate://"
+ + getHost()
+ + ":"
+ + getMappedPort(CRATEDB_PG_PORT)
+ + "/"
+ + databaseName
+ + additionalUrlParams);
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ @Override
+ public String getUsername() {
+ return username;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+
+ @Override
+ public String getTestQueryString() {
+ return "SELECT 1";
+ }
+
+ @Override
+ public CrateDBContainer withDatabaseName(final String databaseName) {
+ this.databaseName = databaseName;
+ return self();
+ }
+
+ @Override
+ public CrateDBContainer withUsername(final String username) {
+ this.username = username;
+ return self();
+ }
+
+ @Override
+ public CrateDBContainer withPassword(final String password) {
+ this.password = password;
+ return self();
+ }
+
+ @Override
+ protected void waitUntilContainerStarted() {
+ getWaitStrategy().waitUntilReady(this);
+ }
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBMetadata.java
new file mode 100644
index 00000000..b44792dc
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBMetadata.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.cratedb;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+
+import javax.sql.XADataSource;
+
+/** CrateDB Metadata. */
+public class CrateDBMetadata implements DatabaseMetadata {
+
+ private final String username;
+ private final String password;
+ private final String url;
+ private final String driver;
+ private final String version;
+
+ public CrateDBMetadata(CrateDBDatabase.CrateDBContainer container) {
+ this.username = container.getUsername();
+ this.password = container.getPassword();
+ this.url = container.getJdbcUrl();
+ this.driver = container.getDriverClassName();
+ this.version = container.getDockerImageName();
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return this.url;
+ }
+
+ @Override
+ public String getJdbcUrlWithCredentials() {
+ return String.format("%s&user=%s&password=%s", getJdbcUrl(), getUsername(), getPassword());
+ }
+
+ @Override
+ public String getUsername() {
+ return this.username;
+ }
+
+ @Override
+ public String getPassword() {
+ return this.password;
+ }
+
+ @Override
+ public XADataSource buildXaDataSource() {
+ throw new UnsupportedOperationException("CrateDB doesn't support XA");
+ }
+
+ @Override
+ public String getDriverClass() {
+ return this.driver;
+ }
+
+ @Override
+ public String getVersion() {
+ return version;
+ }
+}