diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index dcb1f704271eab..24f6a9de2bcaf9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -18,6 +18,7 @@ package org.apache.doris.common; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.metric.Metric; import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricLabel; @@ -33,6 +34,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -68,6 +70,7 @@ */ public class ThreadPoolManager { + private static final Logger LOG = LogManager.getLogger(ThreadPoolManager.class); private static Map nameToThreadPoolMap = Maps.newConcurrentMap(); @@ -119,6 +122,50 @@ TimeUnit.SECONDS, new SynchronousQueue(), new LogDiscardPolicy(poolName), poolName, needRegisterMetric); } + public static ThreadPoolExecutor newDaemonFixedThreadPoolWithPreAuth( + int numThread, + int queueSize, + String poolName, + boolean needRegisterMetric, + PreExecutionAuthenticator preAuth) { + return newDaemonThreadPoolWithPreAuth(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueSize), new BlockedPolicy(poolName, 60), + poolName, needRegisterMetric, preAuth); + } + + public static ThreadPoolExecutor newDaemonThreadPoolWithPreAuth( + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + RejectedExecutionHandler handler, + String poolName, + boolean needRegisterMetric, + PreExecutionAuthenticator preAuth) { + ThreadFactory threadFactory = namedThreadFactoryWithPreAuth(poolName, preAuth); + ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, + keepAliveTime, unit, workQueue, threadFactory, handler); + if (needRegisterMetric) { + nameToThreadPoolMap.put(poolName, threadPool); + } + return threadPool; + } + + private static ThreadFactory namedThreadFactoryWithPreAuth(String poolName, PreExecutionAuthenticator preAuth) { + return new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(poolName + "-%d") + .setThreadFactory(runnable -> new Thread(() -> { + try { + preAuth.execute(runnable); + } catch (Exception e) { + throw new RuntimeException(e); + } + })) + .build(); + } + public static ThreadPoolExecutor newDaemonCacheThreadPoolUseBlockedPolicy(int maxNumThread, String poolName, boolean needRegisterMetric) { return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, @@ -384,4 +431,25 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { } } } + + public static void shutdownExecutorService(ExecutorService executorService) { + // Disable new tasks from being submitted + executorService.shutdown(); + try { + // Wait a while for existing tasks to terminate + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + // Cancel currently executing tasks + executorService.shutdownNow(); + // Wait a while for tasks to respond to being cancelled + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.warn("ExecutorService did not terminate"); + } + } + } catch (InterruptedException e) { + // (Re-)Cancel if current thread also interrupted + executorService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 23d6cb958d414d..86c5a3ae3e676e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -35,6 +35,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.io.Text; @@ -94,6 +95,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; /** @@ -128,6 +130,8 @@ public abstract class ExternalCatalog CREATE_TIME, USE_META_CACHE); + protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = Runtime.getRuntime().availableProcessors(); + // Unique id of this catalog, will be assigned after catalog is loaded. @SerializedName(value = "id") protected long id; @@ -168,6 +172,7 @@ public abstract class ExternalCatalog protected Optional useMetaCache = Optional.empty(); protected MetaCache> metaCache; protected PreExecutionAuthenticator preExecutionAuthenticator; + protected ThreadPoolExecutor threadPoolWithPreAuth; private volatile Configuration cachedConf = null; private byte[] confLock = new byte[0]; @@ -776,6 +781,9 @@ public void onClose() { if (null != transactionManager) { transactionManager = null; } + if (threadPoolWithPreAuth != null) { + ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth); + } CatalogIf.super.onClose(); } @@ -1197,4 +1205,8 @@ public void notifyPropertiesUpdated(Map updatedProps) { Env.getCurrentEnv().getExtMetaCacheMgr().invalidSchemaCache(id); } } + + public ThreadPoolExecutor getThreadPoolExecutor() { + return threadPoolWithPreAuth; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 2442eed0b9c36a..5d9128bcc04bd7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -188,6 +188,12 @@ protected void initLocalObjectsImpl() { String.valueOf(Config.hive_metastore_client_timeout_second)); } HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this); + threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth( + ICEBERG_CATALOG_EXECUTOR_THREAD_NUM, + Integer.MAX_VALUE, + String.format("hms_iceberg_catalog_%s_executor_pool", name), + true, + preExecutionAuthenticator); FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(), this.bindBrokerName(), this.catalogProperty.getHadoopProperties()); this.fileSystemExecutor = ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 30590f5af26070..e25199adfbeaf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; @@ -65,6 +66,12 @@ protected void initLocalObjectsImpl() { initCatalog(); IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog); transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops); + threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth( + ICEBERG_CATALOG_EXECUTOR_THREAD_NUM, + Integer.MAX_VALUE, + String.format("iceberg_catalog_%s_executor_pool", name), + true, + preExecutionAuthenticator); metadataOps = ops; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 787d706132e3cb..14a64163265384 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; public class IcebergMetadataOps implements ExternalMetadataOps { @@ -292,4 +293,8 @@ private Namespace getNamespace(String dbName) { private Namespace getNamespace() { return externalCatalogName.map(Namespace::of).orElseGet(() -> Namespace.empty()); } + + public ThreadPoolExecutor getThreadPoolWithPreAuth() { + return dorisCatalog.getThreadPoolExecutor(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index e36db86022e360..797caea0deaf1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -155,7 +155,7 @@ private synchronized Table getNativeTable(SimpleTableInfo tableInfo) { private void commitAppendTxn(Table table, List pendingResults) { // commit append files. - AppendFiles appendFiles = table.newAppend(); + AppendFiles appendFiles = table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files for append."); @@ -171,7 +171,7 @@ private void commitReplaceTxn(Table table, List pendingResults) { // 1. if dst_tb is a partitioned table, it will return directly. // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied. if (!table.spec().isPartitioned()) { - OverwriteFiles overwriteFiles = table.newOverwrite(); + OverwriteFiles overwriteFiles = table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth()); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file())); } catch (IOException e) {