Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;

public interface HadoopAuthenticator {
Expand All @@ -31,6 +32,12 @@ default <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException {
return getUGI().doAs(action);
} catch (InterruptedException e) {
throw new IOException(e);
} catch (UndeclaredThrowableException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.common.security.authentication;

import org.apache.hadoop.conf.Configuration;

import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;

Expand All @@ -40,6 +42,19 @@ public class PreExecutionAuthenticator {
public PreExecutionAuthenticator() {
}

/**
* Constructor to initialize the PreExecutionAuthenticator object.
* This constructor is responsible for initializing the Hadoop authenticator required for Kerberos authentication
* based on the provided configuration information.
*
* @param configuration Configuration information used to obtain Kerberos authentication settings
*/
public PreExecutionAuthenticator(Configuration configuration) {
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(configuration);
this.hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
}


/**
* Executes the specified task with necessary authentication.
* <p>If a HadoopAuthenticator is set, the task will be executed within a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ public ExternalCatalog(long catalogId, String name, InitCatalogLog.Type logType,
this.comment = Strings.nullToEmpty(comment);
}

/**
* Initializes the PreExecutionAuthenticator instance.
* This method ensures that the authenticator is created only once in a thread-safe manner.
* If additional authentication logic is required, it should be extended and implemented in subclasses.
*/
protected synchronized void initPreExecutionAuthenticator() {
if (preExecutionAuthenticator == null) {
preExecutionAuthenticator = new PreExecutionAuthenticator();
}
}

public Configuration getConfiguration() {
// build configuration is costly, so we cache it.
if (cachedConf != null) {
Expand Down Expand Up @@ -213,6 +224,11 @@ protected List<String> listDatabaseNames() {
}
}

public ExternalMetadataOps getMetadataOps() {
makeSureInitialized();
return metadataOps;
}

// Will be called when creating catalog(so when as replaying)
// to add some default properties if missing.
public void setDefaultPropsIfMissing(boolean isReplay) {
Expand Down Expand Up @@ -1137,6 +1153,9 @@ public void setAutoAnalyzePolicy(String dbName, String tableName, String policy)
}

public PreExecutionAuthenticator getPreExecutionAuthenticator() {
if (null == preExecutionAuthenticator) {
throw new RuntimeException("PreExecutionAuthenticator is null, please confirm it is initialized.");
}
return preExecutionAuthenticator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogProperty;
Expand All @@ -53,7 +51,6 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Getter;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.hive.HiveCatalog;
Expand Down Expand Up @@ -91,8 +88,6 @@ public class HMSExternalCatalog extends ExternalCatalog {

private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16;
private ThreadPoolExecutor fileSystemExecutor;
@Getter
private HadoopAuthenticator authenticator;

private int hmsEventsBatchSizePerRpc = -1;
private boolean enableHmsEventsIncrementalSync = false;
Expand Down Expand Up @@ -170,14 +165,15 @@ public void checkProperties() throws DdlException {
}

@Override
protected void initLocalObjectsImpl() {
this.preExecutionAuthenticator = new PreExecutionAuthenticator();
if (this.authenticator == null) {
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration());
this.authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
this.preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
protected synchronized void initPreExecutionAuthenticator() {
if (preExecutionAuthenticator == null) {
preExecutionAuthenticator = new PreExecutionAuthenticator(getConfiguration());
}
}

@Override
protected void initLocalObjectsImpl() {
initPreExecutionAuthenticator();
HiveConf hiveConf = null;
JdbcClientConfig jdbcClientConfig = null;
String hiveMetastoreType = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_TYPE, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMS
this(catalog, createCachedClient(hiveConf,
Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size),
jdbcClientConfig));
hadoopAuthenticator = catalog.getAuthenticator();
hadoopAuthenticator = catalog.getPreExecutionAuthenticator().getHadoopAuthenticator();
client.setHadoopAuthenticator(hadoopAuthenticator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) {
// Create catalog based on catalog type
protected abstract void initCatalog();

@Override
protected synchronized void initPreExecutionAuthenticator() {
if (preExecutionAuthenticator == null) {
preExecutionAuthenticator = new PreExecutionAuthenticator(getConfiguration());
}
}

@Override
protected void initLocalObjectsImpl() {
preExecutionAuthenticator = new PreExecutionAuthenticator();
initPreExecutionAuthenticator();
initCatalog();
IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,22 @@ public IcebergHadoopExternalCatalog(long catalogId, String name, String resource
@Override
protected void initCatalog() {
icebergCatalogType = ICEBERG_HADOOP;
HadoopCatalog hadoopCatalog = new HadoopCatalog();

Configuration conf = getConfiguration();
initS3Param(conf);
// initialize hadoop catalog
Map<String, String> catalogProperties = catalogProperty.getProperties();
String warehouse = catalogProperty.getHadoopProperties().get(CatalogProperties.WAREHOUSE_LOCATION);
HadoopCatalog hadoopCatalog = new HadoopCatalog();
hadoopCatalog.setConf(conf);
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
hadoopCatalog.initialize(getName(), catalogProperties);
catalog = hadoopCatalog;
try {
this.catalog = preExecutionAuthenticator.execute(() -> {
hadoopCatalog.initialize(getName(), catalogProperties);
return hadoopCatalog;
});
} catch (Exception e) {
throw new RuntimeException("Hadoop catalog init error!", e);
}
}
}
Loading