From 3622287916843747d3787e66cb1731ee28bf3567 Mon Sep 17 00:00:00 2001 From: "wuwenchi.wwc" Date: Thu, 3 Jul 2025 15:38:31 +0800 Subject: [PATCH 1/2] fix --- .../iceberg/IcebergTransaction.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index 95cf6c36a3b37c..14f2eab3438b5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -108,8 +108,8 @@ public void finishInsert(NameMapping nameMapping, Optional } private void updateManifestAfterInsert(TUpdateMode updateMode) { - PartitionSpec spec = table.spec(); - FileFormat fileFormat = IcebergUtils.getFileFormat(table); + PartitionSpec spec = transaction.table().spec(); + FileFormat fileFormat = IcebergUtils.getFileFormat(transaction.table()); List pendingResults; if (commitDataList.isEmpty()) { @@ -122,9 +122,9 @@ private void updateManifestAfterInsert(TUpdateMode updateMode) { } if (updateMode == TUpdateMode.APPEND) { - commitAppendTxn(table, pendingResults); + commitAppendTxn(pendingResults); } else { - commitReplaceTxn(table, pendingResults); + commitReplaceTxn(pendingResults); } } @@ -143,43 +143,44 @@ public long getUpdateCnt() { return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum(); } - private void commitAppendTxn(Table table, List pendingResults) { + private void commitAppendTxn(List pendingResults) { // commit append files. - AppendFiles appendFiles = table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); + AppendFiles appendFiles = transaction.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files for append."); Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); } - appendFiles.commit(); } - private void commitReplaceTxn(Table table, List pendingResults) { + private void commitReplaceTxn(List pendingResults) { if (pendingResults.isEmpty()) { // such as : insert overwrite table `dst_tb` select * from `empty_tb` // 1. if dst_tb is a partitioned table, it will return directly. // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied. - if (!table.spec().isPartitioned()) { - OverwriteFiles overwriteFiles = table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth()); + if (!transaction.table().spec().isPartitioned()) { + OverwriteFiles overwriteFiles = transaction + .newOverwrite() + .scanManifestsWith(ops.getThreadPoolWithPreAuth()); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file())); } catch (IOException e) { throw new RuntimeException(e); } - overwriteFiles.commit(); } return; } // commit replace partitions - ReplacePartitions appendPartitionOp = table.newReplacePartitions(); + ReplacePartitions appendPartitionOp = transaction + .newReplacePartitions() + .scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files."); Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile); } - appendPartitionOp.commit(); } } From 012ef67864c79fae878671747b9e027adecbab8b Mon Sep 17 00:00:00 2001 From: "wuwenchi.wwc" Date: Thu, 3 Jul 2025 16:14:48 +0800 Subject: [PATCH 2/2] fix --- .../apache/doris/datasource/iceberg/IcebergTransaction.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index 14f2eab3438b5d..9160c3012b5c59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -151,6 +151,7 @@ private void commitAppendTxn(List pendingResults) { "Should have no referenced data files for append."); Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); } + appendFiles.commit(); } @@ -168,6 +169,7 @@ private void commitReplaceTxn(List pendingResults) { } catch (IOException e) { throw new RuntimeException(e); } + overwriteFiles.commit(); } return; } @@ -181,6 +183,7 @@ private void commitReplaceTxn(List pendingResults) { "Should have no referenced data files."); Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile); } + appendPartitionOp.commit(); } }