diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index 95cf6c36a3b37c..9160c3012b5c59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -108,8 +108,8 @@ public void finishInsert(NameMapping nameMapping, Optional } private void updateManifestAfterInsert(TUpdateMode updateMode) { - PartitionSpec spec = table.spec(); - FileFormat fileFormat = IcebergUtils.getFileFormat(table); + PartitionSpec spec = transaction.table().spec(); + FileFormat fileFormat = IcebergUtils.getFileFormat(transaction.table()); List pendingResults; if (commitDataList.isEmpty()) { @@ -122,9 +122,9 @@ private void updateManifestAfterInsert(TUpdateMode updateMode) { } if (updateMode == TUpdateMode.APPEND) { - commitAppendTxn(table, pendingResults); + commitAppendTxn(pendingResults); } else { - commitReplaceTxn(table, pendingResults); + commitReplaceTxn(pendingResults); } } @@ -143,9 +143,9 @@ public long getUpdateCnt() { return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum(); } - private void commitAppendTxn(Table table, List pendingResults) { + private void commitAppendTxn(List pendingResults) { // commit append files. - AppendFiles appendFiles = table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); + AppendFiles appendFiles = transaction.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files for append."); @@ -155,13 +155,15 @@ private void commitAppendTxn(Table table, List pendingResults) { } - private void commitReplaceTxn(Table table, List pendingResults) { + private void commitReplaceTxn(List pendingResults) { if (pendingResults.isEmpty()) { // such as : insert overwrite table `dst_tb` select * from `empty_tb` // 1. if dst_tb is a partitioned table, it will return directly. // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied. - if (!table.spec().isPartitioned()) { - OverwriteFiles overwriteFiles = table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth()); + if (!transaction.table().spec().isPartitioned()) { + OverwriteFiles overwriteFiles = transaction + .newOverwrite() + .scanManifestsWith(ops.getThreadPoolWithPreAuth()); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file())); } catch (IOException e) { @@ -173,7 +175,9 @@ private void commitReplaceTxn(Table table, List pendingResults) { } // commit replace partitions - ReplacePartitions appendPartitionOp = table.newReplacePartitions(); + ReplacePartitions appendPartitionOp = transaction + .newReplacePartitions() + .scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");