diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java index c3cab5f410be3a..88d32a593e14dc 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java @@ -20,6 +20,7 @@ import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; public interface HadoopAuthenticator { @@ -31,6 +32,12 @@ default T doAs(PrivilegedExceptionAction action) throws IOException { return getUGI().doAs(action); } catch (InterruptedException e) { throw new IOException(e); + } catch (UndeclaredThrowableException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java index 6260833b7db558..0d7cf60c6f751d 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java @@ -17,6 +17,8 @@ package org.apache.doris.common.security.authentication; +import org.apache.hadoop.conf.Configuration; + import java.security.PrivilegedExceptionAction; import java.util.concurrent.Callable; @@ -40,6 +42,19 @@ public class PreExecutionAuthenticator { public PreExecutionAuthenticator() { } + /** + * Constructor to initialize the PreExecutionAuthenticator object. + * This constructor is responsible for initializing the Hadoop authenticator required for Kerberos authentication + * based on the provided configuration information. + * + * @param configuration Configuration information used to obtain Kerberos authentication settings + */ + public PreExecutionAuthenticator(Configuration configuration) { + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(configuration); + this.hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(config); + } + + /** * Executes the specified task with necessary authentication. *

If a HadoopAuthenticator is set, the task will be executed within a diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 875c8142a6836a..81d3e15d90fc66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -171,6 +171,17 @@ public ExternalCatalog(long catalogId, String name, InitCatalogLog.Type logType, this.comment = Strings.nullToEmpty(comment); } + /** + * Initializes the PreExecutionAuthenticator instance. + * This method ensures that the authenticator is created only once in a thread-safe manner. + * If additional authentication logic is required, it should be extended and implemented in subclasses. + */ + protected synchronized void initPreExecutionAuthenticator() { + if (preExecutionAuthenticator == null) { + preExecutionAuthenticator = new PreExecutionAuthenticator(); + } + } + public Configuration getConfiguration() { // build configuration is costly, so we cache it. if (cachedConf != null) { @@ -208,6 +219,11 @@ protected List listDatabaseNames() { } } + public ExternalMetadataOps getMetadataOps() { + makeSureInitialized(); + return metadataOps; + } + // Will be called when creating catalog(so when as replaying) // to add some default properties if missing. public void setDefaultPropsIfMissing(boolean isReplay) { @@ -1128,6 +1144,9 @@ public void truncateTable(TruncateTableStmt stmt) throws DdlException { } public PreExecutionAuthenticator getPreExecutionAuthenticator() { + if (null == preExecutionAuthenticator) { + throw new RuntimeException("PreExecutionAuthenticator is null, please confirm it is initialized."); + } return preExecutionAuthenticator; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 61a7c030477ece..defa2a8ebe9e14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -25,7 +25,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.common.ThreadPoolManager; -import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.Util; @@ -170,14 +169,15 @@ public void checkProperties() throws DdlException { } @Override - protected void initLocalObjectsImpl() { - this.preExecutionAuthenticator = new PreExecutionAuthenticator(); - if (this.authenticator == null) { - AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); - this.authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); - this.preExecutionAuthenticator.setHadoopAuthenticator(authenticator); + protected synchronized void initPreExecutionAuthenticator() { + if (preExecutionAuthenticator == null) { + preExecutionAuthenticator = new PreExecutionAuthenticator(getConfiguration()); } + } + @Override + protected void initLocalObjectsImpl() { + initPreExecutionAuthenticator(); HiveConf hiveConf = null; JdbcClientConfig jdbcClientConfig = null; String hiveMetastoreType = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_TYPE, ""); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 9bde47fb19903f..530731db81f565 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -72,7 +72,7 @@ public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMS this(catalog, createCachedClient(hiveConf, Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig)); - hadoopAuthenticator = catalog.getAuthenticator(); + hadoopAuthenticator = catalog.getPreExecutionAuthenticator().getHadoopAuthenticator(); client.setHadoopAuthenticator(hadoopAuthenticator); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 523c31c3f74c67..7282de52aa0d22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -40,6 +40,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String ICEBERG_HADOOP = "hadoop"; public static final String ICEBERG_GLUE = "glue"; public static final String ICEBERG_DLF = "dlf"; + public static final String ICEBERG_S3_TABLES = "s3tables"; public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name"; protected String icebergCatalogType; protected Catalog catalog; @@ -51,9 +52,16 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) { // Create catalog based on catalog type protected abstract void initCatalog(); + @Override + protected synchronized void initPreExecutionAuthenticator() { + if (preExecutionAuthenticator == null) { + preExecutionAuthenticator = new PreExecutionAuthenticator(getConfiguration()); + } + } + @Override protected void initLocalObjectsImpl() { - preExecutionAuthenticator = new PreExecutionAuthenticator(); + initPreExecutionAuthenticator(); initCatalog(); IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog); transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java index 5053e129ef6ef7..d8c9b7e1e2891e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java @@ -53,15 +53,22 @@ public IcebergHadoopExternalCatalog(long catalogId, String name, String resource @Override protected void initCatalog() { icebergCatalogType = ICEBERG_HADOOP; - HadoopCatalog hadoopCatalog = new HadoopCatalog(); + Configuration conf = getConfiguration(); initS3Param(conf); // initialize hadoop catalog Map catalogProperties = catalogProperty.getProperties(); String warehouse = catalogProperty.getHadoopProperties().get(CatalogProperties.WAREHOUSE_LOCATION); + HadoopCatalog hadoopCatalog = new HadoopCatalog(); hadoopCatalog.setConf(conf); catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); - hadoopCatalog.initialize(getName(), catalogProperties); - catalog = hadoopCatalog; + try { + this.catalog = preExecutionAuthenticator.execute(() -> { + hadoopCatalog.initialize(getName(), catalogProperties); + return hadoopCatalog; + }); + } catch (Exception e) { + throw new RuntimeException("Hadoop catalog init error!", e); + } } }