From 370865cd596a0ecf9ef727a7a2d3689ac6339bb8 Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 25 Jun 2025 16:08:48 +0800 Subject: [PATCH 1/2] fix --- .../rules/rewrite/PruneEmptyPartition.java | 28 +++++++++++++++++++ .../doris/transaction/TransactionEntry.java | 26 +++++++++++++++++ .../insert_p0/transaction/txn_insert.groovy | 22 +++++++++++++++ 3 files changed, 76 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java index c7b8f452afbe66..e402dd6aec7326 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java @@ -18,18 +18,23 @@ package org.apache.doris.nereids.rules.rewrite; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.qe.ConnectContext; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; /** * Used to prune empty partition. */ public class PruneEmptyPartition extends OneRewriteRuleFactory { + public static final Logger LOG = LogManager.getLogger(PruneEmptyPartition.class); @Override public Rule build() { @@ -38,6 +43,10 @@ public Rule build() { OlapTable table = scan.getTable(); List partitionIdsToPrune = scan.getSelectedPartitionIds(); List ids = table.selectNonEmptyPartitionIds(partitionIdsToPrune); + if (ConnectContext.get() != null && ConnectContext.get().isTxnModel()) { + // In transaction load, need to add empty partitions which have invisible data of sub transactions + selectNonEmptyPartitionIdsForTxnLoad(table, scan.getSelectedIndexId(), partitionIdsToPrune, ids); + } if (ids.isEmpty()) { return new LogicalEmptyRelation(ConnectContext.get().getStatementContext().getNextRelationId(), scan.getOutput()); @@ -49,4 +58,23 @@ public Rule build() { return scan.withSelectedPartitionIds(ids); }).toRule(RuleType.PRUNE_EMPTY_PARTITION); } + + private void selectNonEmptyPartitionIdsForTxnLoad(OlapTable table, long indexId, List selectedPartitions, + List nonEmptyPartitionIds) { + for (Long selectedPartitionId : selectedPartitions) { + if (nonEmptyPartitionIds.contains(selectedPartitionId)) { + continue; + } + Partition partition = table.getPartition(selectedPartitionId); + if (partition == null) { + continue; + } + if (!ConnectContext.get().getTxnEntry().getPartitionSubTxnIds(table.getId(), partition, indexId) + .isEmpty()) { + nonEmptyPartitionIds.add(selectedPartitionId); + } + } + LOG.debug("add partition for txn load, table: {}, selected partitions: {}, non empty partitions: {}", + table.getId(), selectedPartitions, nonEmptyPartitionIds); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java index d3bf39e4f0f01c..a2725ffd495747 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -21,6 +21,8 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr; @@ -492,6 +494,30 @@ public void setTxnLoadInfoInObserver(TTxnLoadInfo txnLoadInfo) throws DdlExcepti + "subTxnStates={}", label, transactionId, dbId, timeoutTimestamp, allSubTxnNum, subTransactionStates); } + public List getPartitionSubTxnIds(long tableId, Partition partition, long indexId) { + List subTxnIds = new ArrayList<>(); + MaterializedIndex index = partition.getIndex(indexId); + if (index == null) { + LOG.error("index={} not found in table={}, partition={}", indexId, tableId, partition.getId()); + return subTxnIds; + } + for (SubTransactionState subTransactionState : subTransactionStates) { + if (subTransactionState.getTable().getId() != tableId) { + continue; + } + for (TTabletCommitInfo tabletCommitInfo : subTransactionState.getTabletCommitInfos()) { + if (index.getTablet(tabletCommitInfo.getTabletId()) != null) { + subTxnIds.add(subTransactionState.getSubTransactionId()); + break; + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("table_id={}, partition_id={}, sub_txn_ids={}", tableId, partition.getId(), subTxnIds); + } + return subTxnIds; + } + private void resetByTxnInfo(TTxnLoadInfo txnLoadInfo) throws DdlException { if (txnLoadInfo.isSetDbId()) { this.dbId = txnLoadInfo.getDbId(); diff --git a/regression-test/suites/insert_p0/transaction/txn_insert.groovy b/regression-test/suites/insert_p0/transaction/txn_insert.groovy index f4c8caa35ba991..729b48ca7a1d4c 100644 --- a/regression-test/suites/insert_p0/transaction/txn_insert.groovy +++ b/regression-test/suites/insert_p0/transaction/txn_insert.groovy @@ -784,6 +784,28 @@ suite("txn_insert") { order_qt_select_cu2 """select * from ${unique_table}_2""" order_qt_select_cu3 """select * from ${unique_table}_3""" } + + // 19. delete from empty table + sql """ drop table if exists txn_insert_dt6; """ + sql """ + CREATE TABLE `txn_insert_dt6` ( + `ID` int NOT NULL, + `NAME` varchar(100) NULL, + `SCORE` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`ID`) + DISTRIBUTED BY HASH(`ID`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ begin; """ + sql """ INSERT INTO txn_insert_dt6 select 1, 'Alice', 100; """ + test { + sql """ delete from txn_insert_dt6 where id = 1; """ + exception """Can not delete because there is a insert operation for the same table""" + } + sql """ rollback; """ } def db_name = "regression_test_insert_p0_transaction" From 94de8fb5eab2b8d273e24930dec84d98c20f8b94 Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 27 Jun 2025 20:09:58 +0800 Subject: [PATCH 2/2] fix --- .../rules/rewrite/PruneEmptyPartition.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java index e402dd6aec7326..3347518b1652a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java @@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.transaction.TransactionEntry; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -43,9 +44,10 @@ public Rule build() { OlapTable table = scan.getTable(); List partitionIdsToPrune = scan.getSelectedPartitionIds(); List ids = table.selectNonEmptyPartitionIds(partitionIdsToPrune); - if (ConnectContext.get() != null && ConnectContext.get().isTxnModel()) { + if (ctx.connectContext != null && ctx.connectContext.isTxnModel()) { // In transaction load, need to add empty partitions which have invisible data of sub transactions - selectNonEmptyPartitionIdsForTxnLoad(table, scan.getSelectedIndexId(), partitionIdsToPrune, ids); + selectNonEmptyPartitionIdsForTxnLoad(ctx.connectContext.getTxnEntry(), table, scan.getSelectedIndexId(), + partitionIdsToPrune, ids); } if (ids.isEmpty()) { return new LogicalEmptyRelation(ConnectContext.get().getStatementContext().getNextRelationId(), @@ -59,8 +61,8 @@ public Rule build() { }).toRule(RuleType.PRUNE_EMPTY_PARTITION); } - private void selectNonEmptyPartitionIdsForTxnLoad(OlapTable table, long indexId, List selectedPartitions, - List nonEmptyPartitionIds) { + private void selectNonEmptyPartitionIdsForTxnLoad(TransactionEntry txnEntry, OlapTable table, long indexId, + List selectedPartitions, List nonEmptyPartitionIds) { for (Long selectedPartitionId : selectedPartitions) { if (nonEmptyPartitionIds.contains(selectedPartitionId)) { continue; @@ -69,12 +71,13 @@ private void selectNonEmptyPartitionIdsForTxnLoad(OlapTable table, long indexId, if (partition == null) { continue; } - if (!ConnectContext.get().getTxnEntry().getPartitionSubTxnIds(table.getId(), partition, indexId) - .isEmpty()) { + if (!txnEntry.getPartitionSubTxnIds(table.getId(), partition, indexId).isEmpty()) { nonEmptyPartitionIds.add(selectedPartitionId); } } - LOG.debug("add partition for txn load, table: {}, selected partitions: {}, non empty partitions: {}", - table.getId(), selectedPartitions, nonEmptyPartitionIds); + if (LOG.isDebugEnabled()) { + LOG.debug("add partition for txn load, table: {}, selected partitions: {}, non empty partitions: {}", + table.getId(), selectedPartitions, nonEmptyPartitionIds); + } } }