Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.security.authentication;

import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;

/**
* PreExecutionAuthenticator is a utility class that ensures specified tasks
* are executed with necessary authentication, particularly useful for systems
* like Hadoop that require Kerberos-based pre-execution authentication.
*
* <p>If a HadoopAuthenticator is provided, this class will execute tasks
* within a privileged context using Hadoop's authentication mechanisms
* (such as Kerberos). Otherwise, it will execute tasks normally.
*/
public class PreExecutionAuthenticator {

private HadoopAuthenticator hadoopAuthenticator;

/**
* Default constructor for PreExecutionAuthenticator.
* This allows setting the HadoopAuthenticator at a later point if needed.
*/
public PreExecutionAuthenticator() {
}

/**
* Executes the specified task with necessary authentication.
* <p>If a HadoopAuthenticator is set, the task will be executed within a
* privileged context using the doAs method. If no authenticator is present,
* the task will be executed directly.
*
* @param task The task to execute, represented as a Callable
* @param <T> The type of the result returned by the task
* @return The result of the executed task
* @throws Exception If an exception occurs during task execution
*/
public <T> T execute(Callable<T> task) throws Exception {
if (hadoopAuthenticator != null) {
// Adapts Callable to PrivilegedExceptionAction for use with Hadoop authentication
PrivilegedExceptionAction<T> action = new CallableToPrivilegedExceptionActionAdapter<>(task);
return hadoopAuthenticator.doAs(action);
} else {
// Executes the task directly if no authentication is needed
return task.call();
}
}

/**
* Retrieves the current HadoopAuthenticator.
* <p>This allows checking if a HadoopAuthenticator is configured or
* changing it at runtime.
*
* @return The current HadoopAuthenticator instance, or null if none is set
*/
public HadoopAuthenticator getHadoopAuthenticator() {
return hadoopAuthenticator;
}

/**
* Sets the HadoopAuthenticator, enabling pre-execution authentication
* for tasks requiring privileged access.
*
* @param hadoopAuthenticator An instance of HadoopAuthenticator to be used
*/
public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) {
this.hadoopAuthenticator = hadoopAuthenticator;
}

/**
* Adapter class to convert a Callable into a PrivilegedExceptionAction.
* <p>This is necessary to run the task within a privileged context,
* particularly for Hadoop operations with Kerberos.
*
* @param <T> The type of result returned by the action
*/
public class CallableToPrivilegedExceptionActionAdapter<T> implements PrivilegedExceptionAction<T> {
private final Callable<T> callable;

/**
* Constructs an adapter that wraps a Callable into a PrivilegedExceptionAction.
*
* @param callable The Callable to be adapted
*/
public CallableToPrivilegedExceptionActionAdapter(Callable<T> callable) {
this.callable = callable;
}

/**
* Executes the wrapped Callable as a PrivilegedExceptionAction.
*
* @return The result of the callable's call method
* @throws Exception If an exception occurs during callable execution
*/
@Override
public T run() throws Exception {
return callable.call();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.iceberg;

import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
Expand All @@ -42,6 +43,8 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
protected String icebergCatalogType;
protected Catalog catalog;

protected PreExecutionAuthenticator preExecutionAuthenticator;

public IcebergExternalCatalog(long catalogId, String name, String comment) {
super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
}
Expand All @@ -51,6 +54,7 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) {

@Override
protected void initLocalObjectsImpl() {
preExecutionAuthenticator = new PreExecutionAuthenticator();
initCatalog();
IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.datasource.iceberg;

import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.PropertyConverter;

Expand All @@ -35,6 +37,11 @@ public IcebergHMSExternalCatalog(long catalogId, String name, String resource, M
protected void initCatalog() {
icebergCatalogType = ICEBERG_HMS;
catalog = IcebergUtils.createIcebergHiveCatalog(this, getName());
if (preExecutionAuthenticator.getHadoopAuthenticator() == null) {
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration());
HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.datasource.DorisTypeVisitor;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
Expand All @@ -53,11 +54,14 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
protected Catalog catalog;
protected IcebergExternalCatalog dorisCatalog;
protected SupportsNamespaces nsCatalog;
private PreExecutionAuthenticator preExecutionAuthenticator;

public IcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog catalog) {
this.dorisCatalog = dorisCatalog;
this.catalog = catalog;
nsCatalog = (SupportsNamespaces) catalog;
this.preExecutionAuthenticator = dorisCatalog.preExecutionAuthenticator;

}

public Catalog getCatalog() {
Expand All @@ -82,9 +86,13 @@ public boolean databaseExist(String dbName) {
}

public List<String> listDatabaseNames() {
return nsCatalog.listNamespaces().stream()
.map(e -> e.toString())
.collect(Collectors.toList());
try {
return preExecutionAuthenticator.execute(() -> nsCatalog.listNamespaces().stream()
.map(Namespace::toString)
.collect(Collectors.toList()));
} catch (Exception e) {
throw new RuntimeException("Failed to list database names, error message is: " + e.getMessage());
}
}

@Override
Expand All @@ -95,6 +103,19 @@ public List<String> listTableNames(String dbName) {

@Override
public void createDb(CreateDbStmt stmt) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
performCreateDb(stmt);
return null;

});
} catch (Exception e) {
throw new DdlException("Failed to create database: "
+ stmt.getFullDbName() + " ,error message is: " + e.getMessage());
}
}

private void performCreateDb(CreateDbStmt stmt) throws DdlException {
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
String dbName = stmt.getFullDbName();
Map<String, String> properties = stmt.getProperties();
Expand All @@ -109,14 +130,25 @@ public void createDb(CreateDbStmt stmt) throws DdlException {
String icebergCatalogType = dorisCatalog.getIcebergCatalogType();
if (!properties.isEmpty() && !IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) {
throw new DdlException(
"Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType);
"Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType);
}
nsCatalog.createNamespace(Namespace.of(dbName), properties);
dorisCatalog.onRefreshCache(true);
}

@Override
public void dropDb(DropDbStmt stmt) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
preformDropDb(stmt);
return null;
});
} catch (Exception e) {
throw new DdlException("Failed to drop database: " + stmt.getDbName() + " ,error message is: ", e);
}
}

private void preformDropDb(DropDbStmt stmt) throws DdlException {
String dbName = stmt.getDbName();
if (!databaseExist(dbName)) {
if (stmt.isSetIfExists()) {
Expand All @@ -133,6 +165,15 @@ public void dropDb(DropDbStmt stmt) throws DdlException {

@Override
public boolean createTable(CreateTableStmt stmt) throws UserException {
try {
preExecutionAuthenticator.execute(() -> performCreateTable(stmt));
} catch (Exception e) {
throw new DdlException("Failed to create table: " + stmt.getTableName() + " ,error message is:", e);
}
return false;
}

public boolean performCreateTable(CreateTableStmt stmt) throws UserException {
String dbName = stmt.getDbName();
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db == null) {
Expand Down Expand Up @@ -165,6 +206,17 @@ public boolean createTable(CreateTableStmt stmt) throws UserException {

@Override
public void dropTable(DropTableStmt stmt) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
performDropTable(stmt);
return null;
});
} catch (Exception e) {
throw new DdlException("Failed to drop table: " + stmt.getTableName() + " ,error message is:", e);
}
}

private void performDropTable(DropTableStmt stmt) throws DdlException {
String dbName = stmt.getDbName();
String tableName = stmt.getTableName();
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
Expand Down Expand Up @@ -193,4 +245,8 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
public void truncateTable(String dbName, String tblName, List<String> partitions) {
throw new UnsupportedOperationException("Truncate Iceberg table is not supported.");
}

public PreExecutionAuthenticator getPreExecutionAuthenticator() {
return preExecutionAuthenticator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,23 @@ public void finishInsert(SimpleTableInfo tableInfo, Optional<InsertCommandContex
if (LOG.isDebugEnabled()) {
LOG.info("iceberg table {} insert table finished!", tableInfo);
}

//create and start the iceberg transaction
TUpdateMode updateMode = TUpdateMode.APPEND;
if (insertCtx.isPresent()) {
updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE
: TUpdateMode.APPEND;
try {
ops.getPreExecutionAuthenticator().execute(() -> {
//create and start the iceberg transaction
TUpdateMode updateMode = TUpdateMode.APPEND;
if (insertCtx.isPresent()) {
updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite()
? TUpdateMode.OVERWRITE
: TUpdateMode.APPEND;
}
updateManifestAfterInsert(updateMode);
return null;
});
} catch (Exception e) {
LOG.warn("Failed to finish insert for iceberg table {}.", tableInfo, e);
throw new RuntimeException(e);
}
updateManifestAfterInsert(updateMode);

}

private void updateManifestAfterInsert(TUpdateMode updateMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private void createCatalog() throws IOException {
hadoopCatalog.setConf(new Configuration());
hadoopCatalog.initialize("df", props);
this.externalCatalog = new IcebergHMSExternalCatalog(1L, "iceberg", "", Maps.newHashMap(), "");
externalCatalog.initLocalObjectsImpl();
new MockUp<IcebergHMSExternalCatalog>() {
@Mock
public Catalog getCatalog() {
Expand Down