Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;

public interface HadoopAuthenticator {
Expand All @@ -31,6 +32,12 @@ default <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException {
return getUGI().doAs(action);
} catch (InterruptedException e) {
throw new IOException(e);
} catch (UndeclaredThrowableException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.common.security.authentication;

import org.apache.hadoop.conf.Configuration;

import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;

Expand All @@ -40,6 +42,19 @@ public class PreExecutionAuthenticator {
public PreExecutionAuthenticator() {
}

/**
* Constructor to initialize the PreExecutionAuthenticator object.
* This constructor is responsible for initializing the Hadoop authenticator required for Kerberos authentication
* based on the provided configuration information.
*
* @param configuration Configuration information used to obtain Kerberos authentication settings
*/
public PreExecutionAuthenticator(Configuration configuration) {
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(configuration);
this.hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
}


/**
* Executes the specified task with necessary authentication.
* <p>If a HadoopAuthenticator is set, the task will be executed within a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,17 @@ public ExternalCatalog(long catalogId, String name, InitCatalogLog.Type logType,
this.comment = Strings.nullToEmpty(comment);
}

/**
* Initializes the PreExecutionAuthenticator instance.
* This method ensures that the authenticator is created only once in a thread-safe manner.
* If additional authentication logic is required, it should be extended and implemented in subclasses.
*/
protected synchronized void initPreExecutionAuthenticator() {
if (preExecutionAuthenticator == null) {
preExecutionAuthenticator = new PreExecutionAuthenticator();
}
}

public Configuration getConfiguration() {
// build configuration is costly, so we cache it.
if (cachedConf != null) {
Expand Down Expand Up @@ -208,6 +219,11 @@ protected List<String> listDatabaseNames() {
}
}

public ExternalMetadataOps getMetadataOps() {
makeSureInitialized();
return metadataOps;
}

// Will be called when creating catalog(so when as replaying)
// to add some default properties if missing.
public void setDefaultPropsIfMissing(boolean isReplay) {
Expand Down Expand Up @@ -1128,6 +1144,9 @@ public void truncateTable(TruncateTableStmt stmt) throws DdlException {
}

public PreExecutionAuthenticator getPreExecutionAuthenticator() {
if (null == preExecutionAuthenticator) {
throw new RuntimeException("PreExecutionAuthenticator is null, please confirm it is initialized.");
}
return preExecutionAuthenticator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.Util;
Expand Down Expand Up @@ -170,14 +169,15 @@ public void checkProperties() throws DdlException {
}

@Override
protected void initLocalObjectsImpl() {
this.preExecutionAuthenticator = new PreExecutionAuthenticator();
if (this.authenticator == null) {
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration());
this.authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
this.preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
protected synchronized void initPreExecutionAuthenticator() {
if (preExecutionAuthenticator == null) {
preExecutionAuthenticator = new PreExecutionAuthenticator(getConfiguration());
}
}

@Override
protected void initLocalObjectsImpl() {
initPreExecutionAuthenticator();
HiveConf hiveConf = null;
JdbcClientConfig jdbcClientConfig = null;
String hiveMetastoreType = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_TYPE, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMS
this(catalog, createCachedClient(hiveConf,
Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size),
jdbcClientConfig));
hadoopAuthenticator = catalog.getAuthenticator();
hadoopAuthenticator = catalog.getPreExecutionAuthenticator().getHadoopAuthenticator();
client.setHadoopAuthenticator(hadoopAuthenticator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
public static final String ICEBERG_HADOOP = "hadoop";
public static final String ICEBERG_GLUE = "glue";
public static final String ICEBERG_DLF = "dlf";
public static final String ICEBERG_S3_TABLES = "s3tables";
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
protected String icebergCatalogType;
protected Catalog catalog;
Expand All @@ -51,9 +52,16 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) {
// Create catalog based on catalog type
protected abstract void initCatalog();

@Override
protected synchronized void initPreExecutionAuthenticator() {
if (preExecutionAuthenticator == null) {
preExecutionAuthenticator = new PreExecutionAuthenticator(getConfiguration());
}
}

@Override
protected void initLocalObjectsImpl() {
preExecutionAuthenticator = new PreExecutionAuthenticator();
initPreExecutionAuthenticator();
initCatalog();
IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,22 @@ public IcebergHadoopExternalCatalog(long catalogId, String name, String resource
@Override
protected void initCatalog() {
icebergCatalogType = ICEBERG_HADOOP;
HadoopCatalog hadoopCatalog = new HadoopCatalog();

Configuration conf = getConfiguration();
initS3Param(conf);
// initialize hadoop catalog
Map<String, String> catalogProperties = catalogProperty.getProperties();
String warehouse = catalogProperty.getHadoopProperties().get(CatalogProperties.WAREHOUSE_LOCATION);
HadoopCatalog hadoopCatalog = new HadoopCatalog();
hadoopCatalog.setConf(conf);
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
hadoopCatalog.initialize(getName(), catalogProperties);
catalog = hadoopCatalog;
try {
this.catalog = preExecutionAuthenticator.execute(() -> {
hadoopCatalog.initialize(getName(), catalogProperties);
return hadoopCatalog;
});
} catch (Exception e) {
throw new RuntimeException("Hadoop catalog init error!", e);
}
}
}