diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java new file mode 100644 index 00000000000000..6260833b7db558 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.Callable; + +/** + * PreExecutionAuthenticator is a utility class that ensures specified tasks + * are executed with necessary authentication, particularly useful for systems + * like Hadoop that require Kerberos-based pre-execution authentication. + * + *

If a HadoopAuthenticator is provided, this class will execute tasks + * within a privileged context using Hadoop's authentication mechanisms + * (such as Kerberos). Otherwise, it will execute tasks normally. + */ +public class PreExecutionAuthenticator { + + private HadoopAuthenticator hadoopAuthenticator; + + /** + * Default constructor for PreExecutionAuthenticator. + * This allows setting the HadoopAuthenticator at a later point if needed. + */ + public PreExecutionAuthenticator() { + } + + /** + * Executes the specified task with necessary authentication. + *

If a HadoopAuthenticator is set, the task will be executed within a + * privileged context using the doAs method. If no authenticator is present, + * the task will be executed directly. + * + * @param task The task to execute, represented as a Callable + * @param The type of the result returned by the task + * @return The result of the executed task + * @throws Exception If an exception occurs during task execution + */ + public T execute(Callable task) throws Exception { + if (hadoopAuthenticator != null) { + // Adapts Callable to PrivilegedExceptionAction for use with Hadoop authentication + PrivilegedExceptionAction action = new CallableToPrivilegedExceptionActionAdapter<>(task); + return hadoopAuthenticator.doAs(action); + } else { + // Executes the task directly if no authentication is needed + return task.call(); + } + } + + /** + * Retrieves the current HadoopAuthenticator. + *

This allows checking if a HadoopAuthenticator is configured or + * changing it at runtime. + * + * @return The current HadoopAuthenticator instance, or null if none is set + */ + public HadoopAuthenticator getHadoopAuthenticator() { + return hadoopAuthenticator; + } + + /** + * Sets the HadoopAuthenticator, enabling pre-execution authentication + * for tasks requiring privileged access. + * + * @param hadoopAuthenticator An instance of HadoopAuthenticator to be used + */ + public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { + this.hadoopAuthenticator = hadoopAuthenticator; + } + + /** + * Adapter class to convert a Callable into a PrivilegedExceptionAction. + *

This is necessary to run the task within a privileged context, + * particularly for Hadoop operations with Kerberos. + * + * @param The type of result returned by the action + */ + public class CallableToPrivilegedExceptionActionAdapter implements PrivilegedExceptionAction { + private final Callable callable; + + /** + * Constructs an adapter that wraps a Callable into a PrivilegedExceptionAction. + * + * @param callable The Callable to be adapted + */ + public CallableToPrivilegedExceptionActionAdapter(Callable callable) { + this.callable = callable; + } + + /** + * Executes the wrapped Callable as a PrivilegedExceptionAction. + * + * @return The result of the callable's call method + * @throws Exception If an exception occurs during callable execution + */ + @Override + public T run() throws Exception { + return callable.call(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 6f79afd5de5d7f..d8dfd1c128f162 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; @@ -42,6 +43,8 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { protected String icebergCatalogType; protected Catalog catalog; + protected PreExecutionAuthenticator preExecutionAuthenticator; + public IcebergExternalCatalog(long catalogId, String name, String comment) { super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment); } @@ -51,6 +54,7 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) { @Override protected void initLocalObjectsImpl() { + preExecutionAuthenticator = new PreExecutionAuthenticator(); initCatalog(); IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog); transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java index 51d39357b816fa..c5a99c157ce8e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java @@ -17,6 +17,8 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.property.PropertyConverter; @@ -35,6 +37,11 @@ public IcebergHMSExternalCatalog(long catalogId, String name, String resource, M protected void initCatalog() { icebergCatalogType = ICEBERG_HMS; catalog = IcebergUtils.createIcebergHiveCatalog(this, getName()); + if (preExecutionAuthenticator.getHadoopAuthenticator() == null) { + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); + HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); + preExecutionAuthenticator.setHadoopAuthenticator(authenticator); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 7fa7b5e84e8d1a..87aaca90b95dff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -28,6 +28,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.datasource.DorisTypeVisitor; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; @@ -53,11 +54,14 @@ public class IcebergMetadataOps implements ExternalMetadataOps { protected Catalog catalog; protected IcebergExternalCatalog dorisCatalog; protected SupportsNamespaces nsCatalog; + private PreExecutionAuthenticator preExecutionAuthenticator; public IcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog catalog) { this.dorisCatalog = dorisCatalog; this.catalog = catalog; nsCatalog = (SupportsNamespaces) catalog; + this.preExecutionAuthenticator = dorisCatalog.preExecutionAuthenticator; + } public Catalog getCatalog() { @@ -82,9 +86,13 @@ public boolean databaseExist(String dbName) { } public List listDatabaseNames() { - return nsCatalog.listNamespaces().stream() - .map(e -> e.toString()) - .collect(Collectors.toList()); + try { + return preExecutionAuthenticator.execute(() -> nsCatalog.listNamespaces().stream() + .map(Namespace::toString) + .collect(Collectors.toList())); + } catch (Exception e) { + throw new RuntimeException("Failed to list database names, error message is: " + e.getMessage()); + } } @@ -96,6 +104,19 @@ public List listTableNames(String dbName) { @Override public void createDb(CreateDbStmt stmt) throws DdlException { + try { + preExecutionAuthenticator.execute(() -> { + performCreateDb(stmt); + return null; + + }); + } catch (Exception e) { + throw new DdlException("Failed to create database: " + + stmt.getFullDbName() + " ,error message is: " + e.getMessage()); + } + } + + private void performCreateDb(CreateDbStmt stmt) throws DdlException { SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; String dbName = stmt.getFullDbName(); Map properties = stmt.getProperties(); @@ -110,7 +131,7 @@ public void createDb(CreateDbStmt stmt) throws DdlException { String icebergCatalogType = dorisCatalog.getIcebergCatalogType(); if (!properties.isEmpty() && !IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) { throw new DdlException( - "Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType); + "Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType); } nsCatalog.createNamespace(Namespace.of(dbName), properties); dorisCatalog.onRefreshCache(true); @@ -118,6 +139,17 @@ public void createDb(CreateDbStmt stmt) throws DdlException { @Override public void dropDb(DropDbStmt stmt) throws DdlException { + try { + preExecutionAuthenticator.execute(() -> { + preformDropDb(stmt); + return null; + }); + } catch (Exception e) { + throw new DdlException("Failed to drop database: " + stmt.getDbName() + " ,error message is: ", e); + } + } + + private void preformDropDb(DropDbStmt stmt) throws DdlException { String dbName = stmt.getDbName(); if (!databaseExist(dbName)) { if (stmt.isSetIfExists()) { @@ -134,6 +166,15 @@ public void dropDb(DropDbStmt stmt) throws DdlException { @Override public boolean createTable(CreateTableStmt stmt) throws UserException { + try { + preExecutionAuthenticator.execute(() -> performCreateTable(stmt)); + } catch (Exception e) { + throw new DdlException("Failed to create table: " + stmt.getTableName() + " ,error message is:", e); + } + return false; + } + + public boolean performCreateTable(CreateTableStmt stmt) throws UserException { String dbName = stmt.getDbName(); ExternalDatabase db = dorisCatalog.getDbNullable(dbName); if (db == null) { @@ -166,6 +207,17 @@ public boolean createTable(CreateTableStmt stmt) throws UserException { @Override public void dropTable(DropTableStmt stmt) throws DdlException { + try { + preExecutionAuthenticator.execute(() -> { + performDropTable(stmt); + return null; + }); + } catch (Exception e) { + throw new DdlException("Failed to drop table: " + stmt.getTableName() + " ,error message is:", e); + } + } + + private void performDropTable(DropTableStmt stmt) throws DdlException { String dbName = stmt.getDbName(); String tableName = stmt.getTableName(); ExternalDatabase db = dorisCatalog.getDbNullable(dbName); @@ -194,4 +246,8 @@ public void dropTable(DropTableStmt stmt) throws DdlException { public void truncateTable(String dbName, String tblName, List partitions) { throw new UnsupportedOperationException("Truncate Iceberg table is not supported."); } + + public PreExecutionAuthenticator getPreExecutionAuthenticator() { + return preExecutionAuthenticator; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index c198b58b2a96bd..685915025d665e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -82,14 +82,23 @@ public void finishInsert(SimpleTableInfo tableInfo, Optional { + //create and start the iceberg transaction + TUpdateMode updateMode = TUpdateMode.APPEND; + if (insertCtx.isPresent()) { + updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite() + ? TUpdateMode.OVERWRITE + : TUpdateMode.APPEND; + } + updateManifestAfterInsert(updateMode); + return null; + }); + } catch (Exception e) { + LOG.warn("Failed to finish insert for iceberg table {}.", tableInfo, e); + throw new RuntimeException(e); } - updateManifestAfterInsert(updateMode); + } private void updateManifestAfterInsert(TUpdateMode updateMode) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java index 66c3ea197101d1..79f7d5b5ad6555 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java @@ -87,6 +87,7 @@ private void createCatalog() throws IOException { hadoopCatalog.setConf(new Configuration()); hadoopCatalog.initialize("df", props); this.externalCatalog = new IcebergHMSExternalCatalog(1L, "iceberg", "", Maps.newHashMap(), ""); + externalCatalog.initLocalObjectsImpl(); new MockUp() { @Mock public Catalog getCatalog() {