diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 9ad0aa3b8b95bd..740e401a74c992 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -1249,4 +1249,8 @@ public void notifyPropertiesUpdated(Map updatedProps) { Env.getCurrentEnv().getExtMetaCacheMgr().invalidSchemaCache(id); } } + + public ThreadPoolExecutor getThreadPoolExecutor() { + return threadPoolWithPreAuth; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 7bc95aa48dd2f4..9f304927b66402 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; public class IcebergMetadataOps implements ExternalMetadataOps { @@ -346,4 +347,8 @@ private Namespace getNamespace(String dbName) { private Namespace getNamespace() { return externalCatalogName.map(Namespace::of).orElseGet(() -> Namespace.empty()); } + + public ThreadPoolExecutor getThreadPoolWithPreAuth() { + return dorisCatalog.getThreadPoolExecutor(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index e36db86022e360..797caea0deaf1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -155,7 +155,7 @@ private synchronized Table getNativeTable(SimpleTableInfo tableInfo) { private void commitAppendTxn(Table table, List pendingResults) { // commit append files. - AppendFiles appendFiles = table.newAppend(); + AppendFiles appendFiles = table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files for append."); @@ -171,7 +171,7 @@ private void commitReplaceTxn(Table table, List pendingResults) { // 1. if dst_tb is a partitioned table, it will return directly. // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied. if (!table.spec().isPartitioned()) { - OverwriteFiles overwriteFiles = table.newOverwrite(); + OverwriteFiles overwriteFiles = table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth()); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file())); } catch (IOException e) {