diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 57d80c804c0c18..f6c7146c670067 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -47,6 +47,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String ICEBERG_HADOOP = "hadoop"; public static final String ICEBERG_GLUE = "glue"; public static final String ICEBERG_DLF = "dlf"; + public static final String ICEBERG_JDBC = "jdbc"; public static final String ICEBERG_S3_TABLES = "s3tables"; public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name"; public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND = "iceberg.table.meta.cache.ttl-second"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java index 748c0805393b1e..824d20e70007ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java @@ -39,6 +39,8 @@ public static ExternalCatalog createCatalog(long catalogId, String name, String return new IcebergGlueExternalCatalog(catalogId, name, resource, props, comment); case IcebergExternalCatalog.ICEBERG_DLF: return new IcebergDLFExternalCatalog(catalogId, name, resource, props, comment); + case IcebergExternalCatalog.ICEBERG_JDBC: + return new IcebergJdbcExternalCatalog(catalogId, name, resource, props, comment); case IcebergExternalCatalog.ICEBERG_HADOOP: return new IcebergHadoopExternalCatalog(catalogId, name, resource, props, comment); case IcebergExternalCatalog.ICEBERG_S3_TABLES: diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java new file mode 100644 index 00000000000000..aeb2fd9deec18e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.datasource.CatalogProperty; + +import java.util.Map; + +public class IcebergJdbcExternalCatalog extends IcebergExternalCatalog { + + public IcebergJdbcExternalCatalog(long catalogId, String name, String resource, Map props, + String comment) { + super(catalogId, name, comment); + catalogProperty = new CatalogProperty(resource, props); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 1acafbde5df74f..8095afea6955c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -142,6 +142,7 @@ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckCol case IcebergExternalCatalog.ICEBERG_DLF: case IcebergExternalCatalog.ICEBERG_GLUE: case IcebergExternalCatalog.ICEBERG_HADOOP: + case IcebergExternalCatalog.ICEBERG_JDBC: case IcebergExternalCatalog.ICEBERG_S3_TABLES: source = new IcebergApiSource((IcebergExternalTable) table, desc, columnNameToRange); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java new file mode 100644 index 00000000000000..5c81532edd453e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java @@ -0,0 +1,310 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.metastore; + +import org.apache.doris.catalog.JdbcResource; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.datasource.property.ConnectorProperty; +import org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties; +import org.apache.doris.datasource.property.storage.StorageProperties; + +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties { + private static final Logger LOG = LogManager.getLogger(IcebergJdbcMetaStoreProperties.class); + + private static final String JDBC_PREFIX = "jdbc."; + private static final Map DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>(); + + private Map icebergJdbcCatalogProperties; + + @ConnectorProperty( + names = {"uri", "iceberg.jdbc.uri"}, + required = true, + description = "JDBC connection URI for the Iceberg JDBC catalog." + ) + private String uri = ""; + + @ConnectorProperty( + names = {"iceberg.jdbc.user"}, + required = false, + description = "Username for the Iceberg JDBC catalog." + ) + private String jdbcUser; + + @ConnectorProperty( + names = {"iceberg.jdbc.password"}, + required = false, + sensitive = true, + description = "Password for the Iceberg JDBC catalog." + ) + private String jdbcPassword; + + @ConnectorProperty( + names = {"iceberg.jdbc.init-catalog-tables"}, + required = false, + description = "Whether to create catalog tables if they do not exist." + ) + private String jdbcInitCatalogTables; + + @ConnectorProperty( + names = {"iceberg.jdbc.schema-version"}, + required = false, + description = "Iceberg JDBC catalog schema version (V0/V1)." + ) + private String jdbcSchemaVersion; + + @ConnectorProperty( + names = {"iceberg.jdbc.strict-mode"}, + required = false, + description = "Whether to enforce strict JDBC catalog schema checks." + ) + private String jdbcStrictMode; + + @ConnectorProperty( + names = {"iceberg.jdbc.driver_url"}, + required = false, + description = "JDBC driver JAR file path or URL. " + + "Can be a local file name (will look in $DORIS_HOME/plugins/jdbc_drivers/) " + + "or a full URL (http://, https://, file://)." + ) + private String driverUrl; + + @ConnectorProperty( + names = {"iceberg.jdbc.driver_class"}, + required = false, + description = "JDBC driver class name. If not specified, will be auto-detected from the JDBC URI." + ) + private String driverClass; + + public IcebergJdbcMetaStoreProperties(Map props) { + super(props); + } + + @Override + public String getIcebergCatalogType() { + return IcebergExternalCatalog.ICEBERG_JDBC; + } + + @Override + public void initNormalizeAndCheckProps() { + super.initNormalizeAndCheckProps(); + initIcebergJdbcCatalogProperties(); + } + + @Override + protected void checkRequiredProperties() { + super.checkRequiredProperties(); + if (StringUtils.isBlank(warehouse)) { + throw new IllegalArgumentException("Property warehouse is required."); + } + } + + @Override + public Catalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { + Map fileIOProperties = Maps.newHashMap(); + Configuration conf = new Configuration(); + toFileIOProperties(storagePropertiesList, fileIOProperties, conf); + + Map options = Maps.newHashMap(getIcebergJdbcCatalogProperties()); + options.putAll(fileIOProperties); + + // Support dynamic JDBC driver loading + // We need to register the driver with DriverManager because Iceberg uses DriverManager.getConnection() + // which doesn't respect Thread.contextClassLoader + if (StringUtils.isNotBlank(driverUrl)) { + registerJdbcDriver(driverUrl, driverClass); + LOG.info("Using dynamic JDBC driver from: {}", driverUrl); + } + return CatalogUtil.buildIcebergCatalog(catalogName, options, conf); + } + + /** + * Register JDBC driver with DriverManager. + * This is necessary because DriverManager.getConnection() doesn't use Thread.contextClassLoader, + * it uses the caller's ClassLoader. By registering the driver, DriverManager can find it. + * + * @param driverUrl Path or URL to the JDBC driver JAR + * @param driverClassName Driver class name to register + */ + private void registerJdbcDriver(String driverUrl, String driverClassName) { + try { + String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl); + URL url = new URL(fullDriverUrl); + + ClassLoader classLoader = DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> { + ClassLoader parent = getClass().getClassLoader(); + return URLClassLoader.newInstance(new URL[]{u}, parent); + }); + + if (StringUtils.isBlank(driverClassName)) { + throw new IllegalArgumentException("driver_class is required when driver_url is specified"); + } + + // Load the driver class and register it with DriverManager + Class driverClass = Class.forName(driverClassName, true, classLoader); + java.sql.Driver driver = (java.sql.Driver) driverClass.getDeclaredConstructor().newInstance(); + + // Wrap with a shim driver because DriverManager refuses to use a driver not loaded by system classloader + java.sql.DriverManager.registerDriver(new DriverShim(driver)); + LOG.info("Successfully registered JDBC driver: {} from {}", driverClassName, fullDriverUrl); + + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid driver URL: " + driverUrl, e); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Failed to load JDBC driver class: " + driverClassName, e); + } catch (Exception e) { + throw new RuntimeException("Failed to register JDBC driver: " + driverClassName, e); + } + } + + /** + * A shim driver that wraps the actual driver loaded from a custom ClassLoader. + * This is needed because DriverManager refuses to use a driver that wasn't loaded by the system classloader. + */ + private static class DriverShim implements java.sql.Driver { + private final java.sql.Driver delegate; + + DriverShim(java.sql.Driver delegate) { + this.delegate = delegate; + } + + @Override + public java.sql.Connection connect(String url, java.util.Properties info) throws java.sql.SQLException { + return delegate.connect(url, info); + } + + @Override + public boolean acceptsURL(String url) throws java.sql.SQLException { + return delegate.acceptsURL(url); + } + + @Override + public java.sql.DriverPropertyInfo[] getPropertyInfo(String url, java.util.Properties info) + throws java.sql.SQLException { + return delegate.getPropertyInfo(url, info); + } + + @Override + public int getMajorVersion() { + return delegate.getMajorVersion(); + } + + @Override + public int getMinorVersion() { + return delegate.getMinorVersion(); + } + + @Override + public boolean jdbcCompliant() { + return delegate.jdbcCompliant(); + } + + @Override + public java.util.logging.Logger getParentLogger() throws java.sql.SQLFeatureNotSupportedException { + return delegate.getParentLogger(); + } + } + + public Map getIcebergJdbcCatalogProperties() { + return Collections.unmodifiableMap(icebergJdbcCatalogProperties); + } + + private void initIcebergJdbcCatalogProperties() { + icebergJdbcCatalogProperties = new HashMap<>(); + icebergJdbcCatalogProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC); + icebergJdbcCatalogProperties.put(CatalogProperties.URI, uri); + if (StringUtils.isNotBlank(warehouse)) { + icebergJdbcCatalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); + } + addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.user", jdbcUser); + addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.password", jdbcPassword); + addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.init-catalog-tables", jdbcInitCatalogTables); + addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.schema-version", jdbcSchemaVersion); + addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.strict-mode", jdbcStrictMode); + + if (origProps != null) { + for (Map.Entry entry : origProps.entrySet()) { + String key = entry.getKey(); + if (key != null && key.startsWith(JDBC_PREFIX) + && !icebergJdbcCatalogProperties.containsKey(key)) { + icebergJdbcCatalogProperties.put(key, entry.getValue()); + } + } + } + } + + private static void addIfNotBlank(Map props, String key, String value) { + if (StringUtils.isNotBlank(value)) { + props.put(key, value); + } + } + + private static void toFileIOProperties(List storagePropertiesList, + Map fileIOProperties, Configuration conf) { + for (StorageProperties storageProperties : storagePropertiesList) { + if (storageProperties instanceof AbstractS3CompatibleProperties) { + toS3FileIOProperties((AbstractS3CompatibleProperties) storageProperties, fileIOProperties); + } + if (storageProperties.getHadoopStorageConfig() != null) { + conf.addResource(storageProperties.getHadoopStorageConfig()); + } + } + } + + private static void toS3FileIOProperties(AbstractS3CompatibleProperties s3Properties, + Map options) { + if (StringUtils.isNotBlank(s3Properties.getEndpoint())) { + options.put(S3FileIOProperties.ENDPOINT, s3Properties.getEndpoint()); + } + if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) { + options.put(S3FileIOProperties.PATH_STYLE_ACCESS, s3Properties.getUsePathStyle()); + } + if (StringUtils.isNotBlank(s3Properties.getRegion())) { + options.put(AwsClientProperties.CLIENT_REGION, s3Properties.getRegion()); + } + if (StringUtils.isNotBlank(s3Properties.getAccessKey())) { + options.put(S3FileIOProperties.ACCESS_KEY_ID, s3Properties.getAccessKey()); + } + if (StringUtils.isNotBlank(s3Properties.getSecretKey())) { + options.put(S3FileIOProperties.SECRET_ACCESS_KEY, s3Properties.getSecretKey()); + } + if (StringUtils.isNotBlank(s3Properties.getSessionToken())) { + options.put(S3FileIOProperties.SESSION_TOKEN, s3Properties.getSessionToken()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java index 64fd28216cfb7f..333c6c44806ce7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java @@ -43,6 +43,7 @@ public IcebergPropertiesFactory() { register("hadoop", IcebergFileSystemMetaStoreProperties::new); register("s3tables", IcebergS3TablesMetaStoreProperties::new); register("dlf", IcebergAliyunDLFMetaStoreProperties::new); + register("jdbc", IcebergJdbcMetaStoreProperties::new); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index c0f31f879596f2..39863f1b2a520c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -145,6 +145,7 @@ import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergHadoopExternalCatalog; +import org.apache.doris.datasource.iceberg.IcebergJdbcExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergS3TablesExternalCatalog; import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase; @@ -411,6 +412,7 @@ public class GsonUtils { .registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergHadoopExternalCatalog.class, IcebergHadoopExternalCatalog.class.getSimpleName()) + .registerSubtype(IcebergJdbcExternalCatalog.class, IcebergJdbcExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergS3TablesExternalCatalog.class, IcebergS3TablesExternalCatalog.class.getSimpleName()) .registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName()) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java new file mode 100644 index 00000000000000..b35782b20339d6 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.metastore; + +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class IcebergJdbcMetaStorePropertiesTest { + + @Test + public void testBasicJdbcProperties() { + Map props = new HashMap<>(); + props.put("uri", "jdbc:mysql://localhost:3306/iceberg"); + props.put("warehouse", "s3://warehouse/path"); + props.put("jdbc.user", "iceberg"); + props.put("jdbc.password", "secret"); + + IcebergJdbcMetaStoreProperties jdbcProps = new IcebergJdbcMetaStoreProperties(props); + jdbcProps.initNormalizeAndCheckProps(); + + Map catalogProps = jdbcProps.getIcebergJdbcCatalogProperties(); + Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC, + catalogProps.get(CatalogUtil.ICEBERG_CATALOG_TYPE)); + Assertions.assertEquals("jdbc:mysql://localhost:3306/iceberg", catalogProps.get(CatalogProperties.URI)); + Assertions.assertEquals("s3://warehouse/path", catalogProps.get(CatalogProperties.WAREHOUSE_LOCATION)); + Assertions.assertEquals("iceberg", catalogProps.get("jdbc.user")); + Assertions.assertEquals("secret", catalogProps.get("jdbc.password")); + } + + @Test + public void testJdbcPrefixPassthrough() { + Map props = new HashMap<>(); + props.put("uri", "jdbc:mysql://localhost:3306/iceberg"); + props.put("warehouse", "s3://warehouse/path"); + props.put("jdbc.useSSL", "true"); + props.put("jdbc.verifyServerCertificate", "true"); + + IcebergJdbcMetaStoreProperties jdbcProps = new IcebergJdbcMetaStoreProperties(props); + jdbcProps.initNormalizeAndCheckProps(); + + Map catalogProps = jdbcProps.getIcebergJdbcCatalogProperties(); + Assertions.assertEquals("true", catalogProps.get("jdbc.useSSL")); + Assertions.assertEquals("true", catalogProps.get("jdbc.verifyServerCertificate")); + } + + @Test + public void testMissingWarehouse() { + Map props = new HashMap<>(); + props.put("uri", "jdbc:mysql://localhost:3306/iceberg"); + + IcebergJdbcMetaStoreProperties jdbcProps = new IcebergJdbcMetaStoreProperties(props); + Assertions.assertThrows(IllegalArgumentException.class, jdbcProps::initNormalizeAndCheckProps); + } + + @Test + public void testMissingUri() { + Map props = new HashMap<>(); + props.put("warehouse", "s3://warehouse/path"); + + IcebergJdbcMetaStoreProperties jdbcProps = new IcebergJdbcMetaStoreProperties(props); + Assertions.assertThrows(IllegalArgumentException.class, jdbcProps::initNormalizeAndCheckProps); + } +} diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out new file mode 100644 index 00000000000000..9e7a05f3757d72 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out @@ -0,0 +1,42 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !datatypes_select -- +false 2 200000000000 2.5 3.5 234.56 world 2025-01-02 2025-01-02T11:00 +true 1 100000000000 1.5 2.5 123.45 hello 2025-01-01 2025-01-01T10:00 +true 3 300000000000 3.5 4.5 345.67 test 2025-01-03 2025-01-03T12:00 + +-- !datatypes_count -- +3 + +-- !datatypes_filter -- +1 hello +3 test + +-- !partition_select -- +1 Item1 A 2025-01-01 +2 Item2 A 2025-01-01 +3 Item3 B 2025-01-02 +4 Item4 B 2025-01-02 +5 Item5 A 2025-01-03 + +-- !partition_filter -- +1 Item1 A 2025-01-01 +2 Item2 A 2025-01-01 +5 Item5 A 2025-01-03 + +-- !sys_snapshots -- +1 + +-- !sys_history -- +1 + +-- !after_overwrite -- +1 Item1 A 2025-01-01 +2 Item2 A 2025-01-01 +3 Item3 B 2025-01-02 +4 Item4 B 2025-01-02 +5 Item5 A 2025-01-03 + +-- !mysql_select -- +1 Alice 2025-01-01T10:00 +2 Bob 2025-01-02T11:00 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy new file mode 100644 index 00000000000000..412d305da1cdc5 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_jdbc_catalog", "p0,external,iceberg,external_docker,external_docker_iceberg") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("Iceberg test is not enabled, skip this test") + return; + } + + String enabledJdbc = context.config.otherConfigs.get("enableJdbcTest") + if (enabledJdbc == null || !enabledJdbc.equalsIgnoreCase("true")) { + logger.info("Iceberg JDBC catalog test requires enableJdbcTest, skip this test") + return; + } + + // Get test environment configuration + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String jdbc_port = context.config.otherConfigs.get("pg_14_port") + + // JDBC Catalog specific test - uses PostgreSQL as the metadata store + // If PostgreSQL port is not configured, this test will be skipped + if (jdbc_port == null || jdbc_port.isEmpty()) { + logger.info("Iceberg JDBC catalog PostgreSQL port not configured (pg_14_port), skip this test") + return; + } + + if (minio_port == null || minio_port.isEmpty() || externalEnvIp == null) { + logger.info("Iceberg test environment not fully configured, skip this test") + return; + } + + String catalog_name = "test_iceberg_jdbc_catalog" + String db_name = "jdbc_test_db" + String driver_name = "postgresql-42.5.0.jar" + String driver_download_url = "${getS3Url()}/regression/jdbc_driver/${driver_name}" + String jdbc_drivers_dir = getFeConfig("jdbc_drivers_dir") + String local_driver_dir = "${context.config.dataPath}/jdbc_driver" + String local_driver_path = "${local_driver_dir}/${driver_name}" + String pg_db = "postgres" + String mysql_db = "iceberg_db" + + // MySQL driver config + String mysql_driver_name = "mysql-connector-java-5.1.49-v2.jar" + String mysql_driver_download_url = "${getS3Url()}/regression/jdbc_driver/mysql-connector-java-5.1.49.jar" + String local_mysql_driver_path = "${local_driver_dir}/${mysql_driver_name}" + + def executeCommand = { String cmd, Boolean mustSuc -> + try { + logger.info("execute ${cmd}") + def proc = new ProcessBuilder("/bin/bash", "-c", cmd).redirectErrorStream(true).start() + int exitcode = proc.waitFor() + if (exitcode != 0) { + logger.info("exit code: ${exitcode}, output\n: ${proc.text}") + if (mustSuc == true) { + assertTrue(false, "Execute failed: ${cmd}") + } + } + } catch (IOException e) { + assertTrue(false, "Execute timeout: ${cmd}") + } + } + + // Ensure the PostgreSQL JDBC driver is available on all FE/BE nodes. + def host_ips = new ArrayList() + String[][] backends = sql """ show backends """ + for (def b in backends) { + host_ips.add(b[1]) + } + String[][] frontends = sql """ show frontends """ + for (def f in frontends) { + host_ips.add(f[1]) + } + host_ips = host_ips.unique() + + executeCommand("mkdir -p ${local_driver_dir}", false) + if (!new File(local_driver_path).exists()) { + executeCommand("/usr/bin/curl --max-time 600 ${driver_download_url} --output ${local_driver_path}", true) + } + if (!new File(local_mysql_driver_path).exists()) { + executeCommand("/usr/bin/curl --max-time 600 ${mysql_driver_download_url} --output ${local_mysql_driver_path}", true) + } + for (def ip in host_ips) { + executeCommand("ssh -o StrictHostKeyChecking=no root@${ip} \"mkdir -p ${jdbc_drivers_dir}\"", false) + scpFiles("root", ip, local_driver_path, jdbc_drivers_dir, false) + scpFiles("root", ip, local_mysql_driver_path, jdbc_drivers_dir, false) + } + + try { + // Clean up existing catalog + sql """DROP CATALOG IF EXISTS ${catalog_name}""" + + // Create Iceberg JDBC Catalog with PostgreSQL backend and MinIO storage + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type' = 'iceberg', + 'iceberg.catalog.type' = 'jdbc', + 'uri' = 'jdbc:postgresql://${externalEnvIp}:${jdbc_port}/${pg_db}', + 'warehouse' = 's3://warehouse/jdbc_wh/', + 'iceberg.jdbc.driver_url' = '${driver_name}', + 'iceberg.jdbc.driver_class' = 'org.postgresql.Driver', + 'iceberg.jdbc.user' = 'postgres', + 'iceberg.jdbc.password' = '123456', + 'iceberg.jdbc.init-catalog-tables' = 'true', + 'iceberg.jdbc.schema-version' = 'V1', + 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.region' = 'us-east-1' + ) + """ + + // Switch to the catalog + sql """SWITCH ${catalog_name}""" + + // Test: Show catalogs + def catalogs = sql """SHOW CATALOGS""" + assertTrue(catalogs.toString().contains(catalog_name)) + + // Test: Create database + sql """DROP DATABASE IF EXISTS ${db_name} FORCE""" + sql """CREATE DATABASE ${db_name}""" + + def databases = sql """SHOW DATABASES""" + assertTrue(databases.toString().contains(db_name)) + + sql """USE ${db_name}""" + + // Test: Create non-partitioned table with various data types + sql """DROP TABLE IF EXISTS test_datatypes""" + sql """ + CREATE TABLE test_datatypes ( + c_boolean BOOLEAN, + c_int INT, + c_bigint BIGINT, + c_float FLOAT, + c_double DOUBLE, + c_decimal DECIMAL(10, 2), + c_string STRING, + c_date DATE, + c_datetime DATETIME + ) PROPERTIES ( + 'write-format' = 'parquet', + 'compression-codec' = 'zstd' + ) + """ + + def tables = sql """SHOW TABLES""" + assertTrue(tables.toString().contains("test_datatypes")) + + // Test: Insert data with various types + sql """ + INSERT INTO test_datatypes VALUES + (true, 1, 100000000000, 1.5, 2.5, 123.45, 'hello', '2025-01-01', '2025-01-01 10:00:00'), + (false, 2, 200000000000, 2.5, 3.5, 234.56, 'world', '2025-01-02', '2025-01-02 11:00:00'), + (true, 3, 300000000000, 3.5, 4.5, 345.67, 'test', '2025-01-03', '2025-01-03 12:00:00') + """ + + // Test: Query data with different data types + order_qt_datatypes_select """SELECT * FROM test_datatypes ORDER BY c_int""" + order_qt_datatypes_count """SELECT count(*) FROM test_datatypes""" + order_qt_datatypes_filter """SELECT c_int, c_string FROM test_datatypes WHERE c_boolean = true ORDER BY c_int""" + + // Test: Create partitioned table + sql """DROP TABLE IF EXISTS test_partitioned""" + sql """ + CREATE TABLE test_partitioned ( + id INT, + name STRING, + category STRING, + event_date DATE + ) + PARTITION BY LIST (category) () + PROPERTIES ( + 'write-format' = 'parquet' + ) + """ + + // Test: Insert into partitioned table + sql """ + INSERT INTO test_partitioned VALUES + (1, 'Item1', 'A', '2025-01-01'), + (2, 'Item2', 'A', '2025-01-01'), + (3, 'Item3', 'B', '2025-01-02'), + (4, 'Item4', 'B', '2025-01-02'), + (5, 'Item5', 'A', '2025-01-03') + """ + + order_qt_partition_select """SELECT * FROM test_partitioned ORDER BY id""" + order_qt_partition_filter """SELECT * FROM test_partitioned WHERE category = 'A' ORDER BY id""" + + // Test: System tables + order_qt_sys_snapshots """SELECT count(*) FROM test_datatypes\$snapshots""" + order_qt_sys_history """SELECT count(*) FROM test_datatypes\$history""" + + // Test: DESCRIBE TABLE + def desc = sql """DESCRIBE test_datatypes""" + assertTrue(desc.toString().contains("c_int")) + assertTrue(desc.toString().contains("c_string")) + + // Test: INSERT OVERWRITE + sql """ + INSERT OVERWRITE TABLE test_partitioned + SELECT * FROM test_partitioned WHERE category = 'A' + """ + order_qt_after_overwrite """SELECT * FROM test_partitioned ORDER BY id""" + + // Test: Drop table + sql """DROP TABLE IF EXISTS test_datatypes""" + sql """DROP TABLE IF EXISTS test_partitioned""" + + // Test: Drop database + sql """DROP DATABASE IF EXISTS ${db_name} FORCE""" + + logger.info("Iceberg JDBC Catalog test completed successfully") + + // MySQL Catalog Test + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + if (mysql_port != null) { + // Clean up MySQL database to remove old metadata + // This prevents issues where the database contains metadata pointing to invalid S3 locations + String cleanupCmd = "mysql -h ${externalEnvIp} -P ${mysql_port} -u root -p123456 -e 'DROP DATABASE IF EXISTS iceberg_db; CREATE DATABASE iceberg_db;'" + executeCommand(cleanupCmd, false) + + String mysql_catalog_name = "iceberg_jdbc_mysql" + try { + sql """DROP CATALOG IF EXISTS ${mysql_catalog_name}""" + sql """ + CREATE CATALOG ${mysql_catalog_name} PROPERTIES ( + 'type' = 'iceberg', + 'iceberg.catalog.type' = 'jdbc', + 'uri' = 'jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysql_db}', + 'warehouse' = 's3://warehouse/jdbc_wh_mysql/', + 'iceberg.jdbc.driver_url' = 'file://${jdbc_drivers_dir}/${mysql_driver_name}', + 'iceberg.jdbc.driver_class' = 'com.mysql.jdbc.Driver', + 'iceberg.jdbc.user' = 'root', + 'iceberg.jdbc.password' = '123456', + 'iceberg.jdbc.init-catalog-tables' = 'true', + 'iceberg.jdbc.schema-version' = 'V1', + 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.region' = 'us-east-1' + ) + """ + + sql """SWITCH ${mysql_catalog_name}""" + + String mysql_db_name = "mysql_test_db" + sql """DROP DATABASE IF EXISTS ${mysql_db_name} FORCE""" + sql """CREATE DATABASE ${mysql_db_name}""" + sql """USE ${mysql_db_name}""" + + sql """DROP TABLE IF EXISTS test_mysql_catalog""" + sql """ + CREATE TABLE test_mysql_catalog ( + id INT, + name STRING, + ts DATETIME + ) PROPERTIES ( + 'write-format' = 'parquet' + ) + """ + + sql """ + INSERT INTO test_mysql_catalog VALUES + (1, 'Alice', '2025-01-01 10:00:00'), + (2, 'Bob', '2025-01-02 11:00:00') + """ + + order_qt_mysql_select """SELECT * FROM test_mysql_catalog ORDER BY id""" + + sql """DROP TABLE IF EXISTS test_mysql_catalog""" + sql """DROP DATABASE IF EXISTS ${mysql_db_name} FORCE""" + + logger.info("Iceberg JDBC Catalog (MySQL) test completed successfully") + } catch (Exception e) { + logger.warn("MySQL Catalog test failed: ${e.message}") + // Don't fail the whole suite if MySQL is optional or misconfigured + // But user asked for it, so maybe we should let it fail or log error + throw e + } finally { + try { + sql """SWITCH internal""" + sql """DROP CATALOG IF EXISTS ${mysql_catalog_name}""" + } catch (Exception e) { + logger.warn("Failed to cleanup MySQL catalog: ${e.message}") + } + } + } + + } finally { + // Cleanup + try { + sql """SWITCH internal""" + sql """DROP CATALOG IF EXISTS ${catalog_name}""" + } catch (Exception e) { + logger.warn("Failed to cleanup catalog: ${e.message}") + } + } +}