From 1cdba2cf72fbb2a77dcd8ead9595a04f25260a1a Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Thu, 5 Jun 2025 17:35:31 +0800 Subject: [PATCH 1/2] [fix](iceberg)Fix the thread pool issue used for commit. (#51508) ### What problem does this PR solve? Problem Summary: When Iceberg generates a new snapshot, it performs a merge operation based on the previous snapshot. This operation reads manifest files, and the file reading process uses a global thread pool. However, users may have their own authentication information, which requires the use of doAs to ensure context. Therefore, the thread pool provided by Iceberg cannot be used. ### Release note None --- .../java/org/apache/doris/datasource/ExternalCatalog.java | 4 ++++ .../apache/doris/datasource/iceberg/IcebergMetadataOps.java | 5 +++++ .../apache/doris/datasource/iceberg/IcebergTransaction.java | 4 ++-- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index bd3d966ac37a6f..458755168576c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -1170,4 +1170,8 @@ public void setAutoAnalyzePolicy(String dbName, String tableName, String policy) tableAutoAnalyzePolicy.put(key, policy); } } + + public ThreadPoolExecutor getThreadPoolExecutor() { + return threadPoolWithPreAuth; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 4c06068aa3beaf..78c4ef8fec2fb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; public class IcebergMetadataOps implements ExternalMetadataOps { @@ -291,4 +292,8 @@ private Namespace getNamespace(String dbName) { private Namespace getNamespace() { return externalCatalogName.map(Namespace::of).orElseGet(() -> Namespace.empty()); } + + public ThreadPoolExecutor getThreadPoolWithPreAuth() { + return dorisCatalog.getThreadPoolExecutor(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index e36db86022e360..797caea0deaf1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -155,7 +155,7 @@ private synchronized Table getNativeTable(SimpleTableInfo tableInfo) { private void commitAppendTxn(Table table, List pendingResults) { // commit append files. - AppendFiles appendFiles = table.newAppend(); + AppendFiles appendFiles = table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files for append."); @@ -171,7 +171,7 @@ private void commitReplaceTxn(Table table, List pendingResults) { // 1. if dst_tb is a partitioned table, it will return directly. // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied. if (!table.spec().isPartitioned()) { - OverwriteFiles overwriteFiles = table.newOverwrite(); + OverwriteFiles overwriteFiles = table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth()); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file())); } catch (IOException e) { From 7246439177763c45654e04f160c70f7296733c03 Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Wed, 9 Jul 2025 07:23:22 -0700 Subject: [PATCH 2/2] fix --- .../doris/common/ThreadPoolManager.java | 68 +++++++++++++++++++ .../doris/datasource/ExternalCatalog.java | 8 +++ .../datasource/hive/HMSExternalCatalog.java | 6 ++ .../iceberg/IcebergExternalCatalog.java | 7 ++ 4 files changed, 89 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index 3be5af8ac54eba..0061e3702a145c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -18,6 +18,7 @@ package org.apache.doris.common; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.metric.Metric; import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricLabel; @@ -33,6 +34,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -68,6 +70,7 @@ */ public class ThreadPoolManager { + private static final Logger LOG = LogManager.getLogger(ThreadPoolManager.class); private static Map nameToThreadPoolMap = Maps.newConcurrentMap(); @@ -126,6 +129,50 @@ TimeUnit.SECONDS, new SynchronousQueue(), new LogDiscardPolicyThrowException(poolName), poolName, needRegisterMetric); } + public static ThreadPoolExecutor newDaemonFixedThreadPoolWithPreAuth( + int numThread, + int queueSize, + String poolName, + boolean needRegisterMetric, + PreExecutionAuthenticator preAuth) { + return newDaemonThreadPoolWithPreAuth(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueSize), new BlockedPolicy(poolName, 60), + poolName, needRegisterMetric, preAuth); + } + + public static ThreadPoolExecutor newDaemonThreadPoolWithPreAuth( + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + RejectedExecutionHandler handler, + String poolName, + boolean needRegisterMetric, + PreExecutionAuthenticator preAuth) { + ThreadFactory threadFactory = namedThreadFactoryWithPreAuth(poolName, preAuth); + ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, + keepAliveTime, unit, workQueue, threadFactory, handler); + if (needRegisterMetric) { + nameToThreadPoolMap.put(poolName, threadPool); + } + return threadPool; + } + + private static ThreadFactory namedThreadFactoryWithPreAuth(String poolName, PreExecutionAuthenticator preAuth) { + return new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(poolName + "-%d") + .setThreadFactory(runnable -> new Thread(() -> { + try { + preAuth.execute(runnable); + } catch (Exception e) { + throw new RuntimeException(e); + } + })) + .build(); + } + public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, boolean needRegisterMetric) { return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, @@ -377,4 +424,25 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { } } } + + public static void shutdownExecutorService(ExecutorService executorService) { + // Disable new tasks from being submitted + executorService.shutdown(); + try { + // Wait a while for existing tasks to terminate + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + // Cancel currently executing tasks + executorService.shutdownNow(); + // Wait a while for tasks to respond to being cancelled + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.warn("ExecutorService did not terminate"); + } + } + } catch (InterruptedException e) { + // (Re-)Cancel if current thread also interrupted + executorService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 458755168576c8..6052be35f57336 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -35,6 +35,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.io.Text; @@ -91,6 +92,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; /** @@ -117,6 +119,8 @@ public abstract class ExternalCatalog CREATE_TIME, USE_META_CACHE); + protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = Runtime.getRuntime().availableProcessors(); + // Unique id of this catalog, will be assigned after catalog is loaded. @SerializedName(value = "id") protected long id; @@ -157,6 +161,7 @@ public abstract class ExternalCatalog protected Optional useMetaCache = Optional.empty(); protected MetaCache> metaCache; protected PreExecutionAuthenticator preExecutionAuthenticator; + protected ThreadPoolExecutor threadPoolWithPreAuth; private volatile Configuration cachedConf = null; private byte[] confLock = new byte[0]; @@ -759,6 +764,9 @@ public void onClose() { if (null != transactionManager) { transactionManager = null; } + if (threadPoolWithPreAuth != null) { + ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth); + } CatalogIf.super.onClose(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index defa2a8ebe9e14..73598adfcaf54b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -197,6 +197,12 @@ protected void initLocalObjectsImpl() { String.valueOf(Config.hive_metastore_client_timeout_second)); } HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this); + threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth( + ICEBERG_CATALOG_EXECUTOR_THREAD_NUM, + Integer.MAX_VALUE, + String.format("hms_iceberg_catalog_%s_executor_pool", name), + true, + preExecutionAuthenticator); FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(), this.bindBrokerName(), this.catalogProperty.getHadoopProperties()); this.fileSystemExecutor = ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 30590f5af26070..e25199adfbeaf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; @@ -65,6 +66,12 @@ protected void initLocalObjectsImpl() { initCatalog(); IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog); transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops); + threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth( + ICEBERG_CATALOG_EXECUTOR_THREAD_NUM, + Integer.MAX_VALUE, + String.format("iceberg_catalog_%s_executor_pool", name), + true, + preExecutionAuthenticator); metadataOps = ops; }