diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index 797caea0deaf1d..c7d7212335a224 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -111,8 +111,8 @@ public void finishInsert(SimpleTableInfo tableInfo, Optional pendingResults; if (commitDataList.isEmpty()) { @@ -125,9 +125,9 @@ private void updateManifestAfterInsert(TUpdateMode updateMode) { } if (updateMode == TUpdateMode.APPEND) { - commitAppendTxn(table, pendingResults); + commitAppendTxn(pendingResults); } else { - commitReplaceTxn(table, pendingResults); + commitReplaceTxn(pendingResults); } } @@ -146,16 +146,15 @@ public long getUpdateCnt() { return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum(); } - private synchronized Table getNativeTable(SimpleTableInfo tableInfo) { Objects.requireNonNull(tableInfo); ExternalCatalog externalCatalog = ops.getExternalCatalog(); return IcebergUtils.getRemoteTable(externalCatalog, tableInfo); } - private void commitAppendTxn(Table table, List pendingResults) { + private void commitAppendTxn(List pendingResults) { // commit append files. - AppendFiles appendFiles = table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); + AppendFiles appendFiles = transaction.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files for append."); @@ -165,13 +164,15 @@ private void commitAppendTxn(Table table, List pendingResults) { } - private void commitReplaceTxn(Table table, List pendingResults) { + private void commitReplaceTxn(List pendingResults) { if (pendingResults.isEmpty()) { // such as : insert overwrite table `dst_tb` select * from `empty_tb` // 1. if dst_tb is a partitioned table, it will return directly. // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied. - if (!table.spec().isPartitioned()) { - OverwriteFiles overwriteFiles = table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth()); + if (!transaction.table().spec().isPartitioned()) { + OverwriteFiles overwriteFiles = transaction + .newOverwrite() + .scanManifestsWith(ops.getThreadPoolWithPreAuth()); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file())); } catch (IOException e) { @@ -183,7 +184,9 @@ private void commitReplaceTxn(Table table, List pendingResults) { } // commit replace partitions - ReplacePartitions appendPartitionOp = table.newReplacePartitions(); + ReplacePartitions appendPartitionOp = transaction + .newReplacePartitions() + .scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");