diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index cbf56097df07f7..4a0dae2b160878 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -69,6 +69,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import jdk.internal.loader.AbstractClassLoaderValue.Sub; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -86,6 +87,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -1062,11 +1064,6 @@ public void finishTransaction(long transactionId) throws UserException { readUnlock(); } - // add all commit errors and publish errors to a single set - Set errorReplicaIds = transactionState.getErrorReplicas(); - - List> relatedTblPartitions = Lists.newArrayList(); - // case 1 If database is dropped, then we just throw MetaNotFoundException, because all related tables are // already force dropped, we just ignore the transaction with all tables been force dropped. // case 2 If at least one table lock successfully, which means that the transaction should be finished for @@ -1084,12 +1081,27 @@ public void finishTransaction(long transactionId) throws UserException { tableList = MetaLockUtils.writeLockTablesIfExist(tableList); PublishResult publishResult; try { - if (!finishCheckPartitionVersion(transactionState, db, relatedTblPartitions)) { - return; - } - publishResult = finishCheckQuorumReplicas(transactionState, relatedTblPartitions, errorReplicaIds); - if (publishResult == PublishResult.FAILED) { - return; + // add all commit errors and publish errors to a single set + Set errorReplicaIds = transactionState.getErrorReplicas(); + if (transactionState.getSubTransactionStates() == null) { + List> relatedTblPartitions = Lists.newArrayList(); + if (!finishCheckPartitionVersionWithoutSubTxns(transactionState, db, relatedTblPartitions)) { + return; + } + publishResult = finishCheckQuorumReplicas(transactionState, relatedTblPartitions, errorReplicaIds); + if (publishResult == PublishResult.FAILED) { + return; + } + } else { + // Map>> relatedTblPartitions = new TreeMap<>(); + List> relatedTblPartitions = Lists.newArrayList(); + if (!finishCheckPartitionVersionWithSubTxns(transactionState, db, relatedTblPartitions)) { + return; + } + publishResult = finishCheckQuorumReplicas(transactionState, errorReplicaIds); + if (publishResult == PublishResult.FAILED) { + return; + } } boolean txnOperated = false; writeLock(); @@ -1403,6 +1415,142 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat return publishResult; } + private PublishResult finishCheckQuorumReplicas(TransactionState transactionState, + Set errorReplicaIds) { + long now = System.currentTimeMillis(); + long firstPublishVersionTime = transactionState.getFirstPublishVersionTime(); + boolean allowPublishOneSucc = false; + if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0 + && now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) { + allowPublishOneSucc = true; + } + + List tabletSuccReplicas = Lists.newArrayList(); + List tabletWriteFailedReplicas = Lists.newArrayList(); + List tabletVersionFailedReplicas = Lists.newArrayList(); + List logs = Lists.newArrayList(); + + Map> publishTasks = transactionState.getPublishVersionTasks(); + PublishResult publishResult = PublishResult.QUORUM_SUCC; + for (Pair pair : relatedTblPartitions) { + OlapTable table = pair.key(); + Partition partition = pair.value(); + long tableId = table.getId(); + long partitionId = partition.getId(); + long newVersion = partition.getVisibleVersion() + 1; + int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId); + List allIndices; + if (transactionState.getLoadedTblIndexes().isEmpty()) { + allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL); + } else { + allIndices = Lists.newArrayList(); + for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) { + MaterializedIndex index = partition.getIndex(indexId); + if (index != null) { + allIndices.add(index); + } + } + } + + boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionState.getTransactionId(), table); + + // check success replica number for each tablet. + // a success replica means: + // 1. Not in errorReplicaIds: succeed in both commit and publish phase + // 2. last failed version < 0: is a health replica before + // 3. version catch up: not with a stale version + // Here we only check number, the replica version will be updated in updateCatalogAfterVisible() + for (MaterializedIndex index : allIndices) { + for (Tablet tablet : index.getTablets()) { + tabletSuccReplicas.clear(); + tabletWriteFailedReplicas.clear(); + tabletVersionFailedReplicas.clear(); + for (Replica replica : tablet.getReplicas()) { + // TODO + for (PublishVersionTask publishVersionTask : publishTasks.get(replica.getBackendId())) { + boolean needCheck = false; + needCheck = transactionState.getSubTxnIdToTableCommitInfo().isEmpty() || + transactionState.getSubTxnIdToTableCommitInfo().entrySet().stream().anyMatch( + s -> s.getKey() == publishVersionTask.getTransactionId() + && s.getValue().getTableId() == tableId); + if (needCheck) { + checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn, + newVersion, publishVersionTask, + errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas, + tabletVersionFailedReplicas); + } else { + LOG.info("sout: skip check"); + } + } + } + + int healthReplicaNum = tabletSuccReplicas.size(); + if (healthReplicaNum >= loadRequiredReplicaNum) { + boolean hasFailedReplica = !tabletWriteFailedReplicas.isEmpty() + || !tabletVersionFailedReplicas.isEmpty(); + if (hasFailedReplica) { + String writeDetail = getTabletWriteDetail(tabletSuccReplicas, + tabletWriteFailedReplicas, tabletVersionFailedReplicas); + logs.add(String.format("publish version quorum succ for transaction %s on tablet %s" + + " with version %s, and has failed replicas, load require replica num %s. " + + "table %s, partition: [ id=%s, commit version=%s ], tablet detail: %s", + transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId, + partitionId, partition.getCommittedVersion(), writeDetail)); + } + continue; + } + + String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas, + tabletVersionFailedReplicas); + if (allowPublishOneSucc && healthReplicaNum > 0) { + if (publishResult == PublishResult.QUORUM_SUCC) { + publishResult = PublishResult.TIMEOUT_SUCC; + } + // We can not do any thing except retrying, + // because publish task is assigned a version, + // and doris does not permit discontinuous + // versions. + // + // If a timeout happens, it means that the rowset + // that are being publised exists on a few replicas we should go + // ahead, otherwise data may be lost and thre + // publish task hangs forever. + logs.add(String.format("publish version timeout succ for transaction %s on tablet %s " + + "with version %s, and has failed replicas, load require replica num %s. " + + "table %s, partition %s, tablet detail: %s", + transactionState, tablet.getId(), newVersion, + loadRequiredReplicaNum, tableId, partitionId, writeDetail)); + } else { + publishResult = PublishResult.FAILED; + String errMsg = String.format("publish on tablet %d failed." + + " succeed replica num %d < load required replica num %d." + + " table: %d, partition: %d, publish version: %d", + tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId, + partitionId, newVersion); + transactionState.setErrorMsg(errMsg); + logs.add(String.format("publish version failed for transaction %s on tablet %s with version" + + " %s, and has failed replicas, load required replica num %s. table %s, " + + "partition %s, tablet detail: %s", + transactionState, tablet.getId(), newVersion, + loadRequiredReplicaNum, tableId, partitionId, writeDetail)); + } + } + } + } + + boolean needLog = publishResult != PublishResult.FAILED + || now - transactionState.getLastPublishLogTime() > Config.publish_fail_log_interval_second * 1000L; + if (true) { + transactionState.setLastPublishLogTime(now); + for (String log : logs) { + LOG.info("{}. publish times {}, whole txn publish result {}", + log, transactionState.getPublishCount(), publishResult.name()); + } + } + + return publishResult; + } + private boolean isAlterReplicaLoadedTxn(long transactionId, OlapTable table) { List unfinishedAlterJobs = null; if (table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) {