Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.doris.hudi;


import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.avro.generic.GenericDatumReader;
Expand Down Expand Up @@ -160,14 +159,15 @@ public void open() throws IOException {
cleanResolverLock.readLock().lock();
try {
lastUpdateTime.set(System.currentTimeMillis());
AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(split.hadoopConf());
HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator
.getHadoopAuthenticator(authenticationConfig);
if (split.incrementalRead()) {
recordIterator = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(
split.hadoopConf()),
() -> new MORIncrementalSplitReader(split).buildScanIterator(new Filter[0]));
recordIterator = hadoopAuthenticator.doAs(() -> new MORIncrementalSplitReader(split)
.buildScanIterator(new Filter[0]));
} else {
recordIterator = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(
split.hadoopConf()),
() -> new MORSnapshotSplitReader(split).buildScanIterator(new Filter[0]));
recordIterator = hadoopAuthenticator.doAs(() -> new MORSnapshotSplitReader(split)
.buildScanIterator(new Filter[0]));
}
if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != null) {
cachedResolvers.computeIfAbsent(Thread.currentThread().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.doris.hudi;

import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -77,7 +77,13 @@ public static void killProcess(long pid) {

public static HoodieTableMetaClient getMetaClient(Configuration conf, String basePath) {
HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf);
return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> HoodieTableMetaClient.builder()
.setConf(hadoopStorageConfiguration).setBasePath(basePath).build());
AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf);
HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
try {
return hadoopAuthenticator.doAs(() -> HoodieTableMetaClient.builder()
.setConf(hadoopStorageConfiguration).setBasePath(basePath).build());
} catch (IOException e) {
throw new RuntimeException("Failed to get HoodieTableMetaClient", e);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.thrift.TExprOpcode;
Expand Down Expand Up @@ -68,6 +68,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand Down Expand Up @@ -823,19 +824,22 @@ public static <T> T ugiDoAs(long catalogId, PrivilegedExceptionAction<T> action)

public static <T> T ugiDoAs(Configuration conf, PrivilegedExceptionAction<T> action) {
// if hive config is not ready, then use hadoop kerberos to login
AuthenticationConfig krbConfig = AuthenticationConfig.getKerberosConfig(conf,
AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB);
return HadoopUGI.ugiDoAs(krbConfig, action);
AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf);
HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
try {
return hadoopAuthenticator.doAs(action);
} catch (IOException e) {
LOG.warn("HiveMetaStoreClientHelper ugiDoAs failed.", e);
throw new RuntimeException(e);
}
}

public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) {
String hudiBasePath = table.getRemoteTable().getSd().getLocation();
Configuration conf = getConfiguration(table);
HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf);
return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf),
() -> HoodieTableMetaClient.builder().setConf(hadoopStorageConfiguration).setBasePath(hudiBasePath)
.build());
return ugiDoAs(conf, () -> HoodieTableMetaClient.builder().setConf(hadoopStorageConfiguration)
.setBasePath(hudiBasePath).build());
}

public static Configuration getConfiguration(HMSExternalTable table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.doris.common.DdlException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
Expand All @@ -40,6 +40,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -53,6 +54,7 @@ public abstract class PaimonExternalCatalog extends ExternalCatalog {
protected String catalogType;
protected Catalog catalog;
protected AuthenticationConfig authConf;
protected HadoopAuthenticator hadoopAuthenticator;

private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
PaimonProperties.WAREHOUSE
Expand All @@ -71,9 +73,8 @@ protected void initLocalObjectsImpl() {
for (Map.Entry<String, String> propEntry : this.catalogProperty.getHadoopProperties().entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
}
authConf = AuthenticationConfig.getKerberosConfig(conf,
AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB);
authConf = AuthenticationConfig.getKerberosConfig(conf);
hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(authConf);
}

public String getCatalogType() {
Expand All @@ -82,40 +83,57 @@ public String getCatalogType() {
}

protected List<String> listDatabaseNames() {
return HadoopUGI.ugiDoAs(authConf, () -> new ArrayList<>(catalog.listDatabases()));
try {
return hadoopAuthenticator.doAs(() -> new ArrayList<>(catalog.listDatabases()));
} catch (IOException e) {
throw new RuntimeException("Failed to list databases names, catalog name: " + getName(), e);
}
}

@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
makeSureInitialized();
return HadoopUGI.ugiDoAs(authConf, () -> catalog.tableExists(Identifier.create(dbName, tblName)));
try {
return hadoopAuthenticator.doAs(() -> catalog.tableExists(Identifier.create(dbName, tblName)));
} catch (IOException e) {
throw new RuntimeException("Failed to check table existence, catalog name: " + getName(), e);
}
}

@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
return HadoopUGI.ugiDoAs(authConf, () -> {
List<String> tableNames = null;
try {
tableNames = catalog.listTables(dbName);
} catch (Catalog.DatabaseNotExistException e) {
LOG.warn("DatabaseNotExistException", e);
}
return tableNames;
});
try {
return hadoopAuthenticator.doAs(() -> {
List<String> tableNames = null;
try {
tableNames = catalog.listTables(dbName);
} catch (Catalog.DatabaseNotExistException e) {
LOG.warn("DatabaseNotExistException", e);
}
return tableNames;
});
} catch (IOException e) {
throw new RuntimeException("Failed to list table names, catalog name: " + getName(), e);
}
}

public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblName) {
makeSureInitialized();
return HadoopUGI.ugiDoAs(authConf, () -> {
org.apache.paimon.table.Table table = null;
try {
table = catalog.getTable(Identifier.create(dbName, tblName));
} catch (Catalog.TableNotExistException e) {
LOG.warn("TableNotExistException", e);
}
return table;
});
try {
return hadoopAuthenticator.doAs(() -> {
org.apache.paimon.table.Table table = null;
try {
table = catalog.getTable(Identifier.create(dbName, tblName));
} catch (Catalog.TableNotExistException e) {
LOG.warn("TableNotExistException", e);
}
return table;
});
} catch (IOException e) {
throw new RuntimeException("Failed to get Paimon table, catalog name: " + getName() + ", db: "
+ dbName + ", table: " + tblName, e);
}
}

protected String getPaimonCatalogType(String catalogType) {
Expand All @@ -127,15 +145,19 @@ protected String getPaimonCatalogType(String catalogType) {
}

protected Catalog createCatalog() {
return HadoopUGI.ugiDoAs(authConf, () -> {
Options options = new Options();
Map<String, String> paimonOptionsMap = getPaimonOptionsMap();
for (Map.Entry<String, String> kv : paimonOptionsMap.entrySet()) {
options.set(kv.getKey(), kv.getValue());
}
CatalogContext context = CatalogContext.create(options, getConfiguration());
return createCatalogImpl(context);
});
try {
return hadoopAuthenticator.doAs(() -> {
Options options = new Options();
Map<String, String> paimonOptionsMap = getPaimonOptionsMap();
for (Map.Entry<String, String> kv : paimonOptionsMap.entrySet()) {
options.set(kv.getKey(), kv.getValue());
}
CatalogContext context = CatalogContext.create(options, getConfiguration());
return createCatalogImpl(context);
});
} catch (IOException e) {
throw new RuntimeException("Failed to create catalog, catalog name: " + getName(), e);
}
}

protected Catalog createCatalogImpl(CatalogContext context) {
Expand Down
Loading