Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Mar 28, 2024
1 parent d226acf commit 2780e82
Showing 1 changed file with 159 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import jdk.internal.loader.AbstractClassLoaderValue.Sub;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -86,6 +87,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -1062,11 +1064,6 @@ public void finishTransaction(long transactionId) throws UserException {
readUnlock();
}

// add all commit errors and publish errors to a single set
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();

List<Pair<OlapTable, Partition>> relatedTblPartitions = Lists.newArrayList();

// case 1 If database is dropped, then we just throw MetaNotFoundException, because all related tables are
// already force dropped, we just ignore the transaction with all tables been force dropped.
// case 2 If at least one table lock successfully, which means that the transaction should be finished for
Expand All @@ -1084,12 +1081,27 @@ public void finishTransaction(long transactionId) throws UserException {
tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
PublishResult publishResult;
try {
if (!finishCheckPartitionVersion(transactionState, db, relatedTblPartitions)) {
return;
}
publishResult = finishCheckQuorumReplicas(transactionState, relatedTblPartitions, errorReplicaIds);
if (publishResult == PublishResult.FAILED) {
return;
// add all commit errors and publish errors to a single set
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
if (transactionState.getSubTransactionStates() == null) {
List<Pair<OlapTable, Partition>> relatedTblPartitions = Lists.newArrayList();
if (!finishCheckPartitionVersionWithoutSubTxns(transactionState, db, relatedTblPartitions)) {
return;
}
publishResult = finishCheckQuorumReplicas(transactionState, relatedTblPartitions, errorReplicaIds);
if (publishResult == PublishResult.FAILED) {
return;
}
} else {
// Map<Long, List<Pair<Partition, Long>>> relatedTblPartitions = new TreeMap<>();
List<Pair<OlapTable, Partition>> relatedTblPartitions = Lists.newArrayList();
if (!finishCheckPartitionVersionWithSubTxns(transactionState, db, relatedTblPartitions)) {
return;
}
publishResult = finishCheckQuorumReplicas(transactionState, errorReplicaIds);
if (publishResult == PublishResult.FAILED) {
return;
}
}
boolean txnOperated = false;
writeLock();
Expand Down Expand Up @@ -1403,6 +1415,142 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat
return publishResult;
}

private PublishResult finishCheckQuorumReplicas(TransactionState transactionState,
Set<Long> errorReplicaIds) {
long now = System.currentTimeMillis();
long firstPublishVersionTime = transactionState.getFirstPublishVersionTime();
boolean allowPublishOneSucc = false;
if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
&& now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) {
allowPublishOneSucc = true;
}

List<Replica> tabletSuccReplicas = Lists.newArrayList();
List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
List<String> logs = Lists.newArrayList();

Map<Long, List<PublishVersionTask>> publishTasks = transactionState.getPublishVersionTasks();
PublishResult publishResult = PublishResult.QUORUM_SUCC;
for (Pair<OlapTable, Partition> pair : relatedTblPartitions) {
OlapTable table = pair.key();
Partition partition = pair.value();
long tableId = table.getId();
long partitionId = partition.getId();
long newVersion = partition.getVisibleVersion() + 1;
int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId);
List<MaterializedIndex> allIndices;
if (transactionState.getLoadedTblIndexes().isEmpty()) {
allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
} else {
allIndices = Lists.newArrayList();
for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
MaterializedIndex index = partition.getIndex(indexId);
if (index != null) {
allIndices.add(index);
}
}
}

boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionState.getTransactionId(), table);

// check success replica number for each tablet.
// a success replica means:
// 1. Not in errorReplicaIds: succeed in both commit and publish phase
// 2. last failed version < 0: is a health replica before
// 3. version catch up: not with a stale version
// Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
tabletSuccReplicas.clear();
tabletWriteFailedReplicas.clear();
tabletVersionFailedReplicas.clear();
for (Replica replica : tablet.getReplicas()) {
// TODO
for (PublishVersionTask publishVersionTask : publishTasks.get(replica.getBackendId())) {
boolean needCheck = false;
needCheck = transactionState.getSubTxnIdToTableCommitInfo().isEmpty() ||
transactionState.getSubTxnIdToTableCommitInfo().entrySet().stream().anyMatch(
s -> s.getKey() == publishVersionTask.getTransactionId()
&& s.getValue().getTableId() == tableId);
if (needCheck) {
checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn,
newVersion, publishVersionTask,
errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
} else {
LOG.info("sout: skip check");
}
}
}

int healthReplicaNum = tabletSuccReplicas.size();
if (healthReplicaNum >= loadRequiredReplicaNum) {
boolean hasFailedReplica = !tabletWriteFailedReplicas.isEmpty()
|| !tabletVersionFailedReplicas.isEmpty();
if (hasFailedReplica) {
String writeDetail = getTabletWriteDetail(tabletSuccReplicas,
tabletWriteFailedReplicas, tabletVersionFailedReplicas);
logs.add(String.format("publish version quorum succ for transaction %s on tablet %s"
+ " with version %s, and has failed replicas, load require replica num %s. "
+ "table %s, partition: [ id=%s, commit version=%s ], tablet detail: %s",
transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId,
partitionId, partition.getCommittedVersion(), writeDetail));
}
continue;
}

String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
if (allowPublishOneSucc && healthReplicaNum > 0) {
if (publishResult == PublishResult.QUORUM_SUCC) {
publishResult = PublishResult.TIMEOUT_SUCC;
}
// We can not do any thing except retrying,
// because publish task is assigned a version,
// and doris does not permit discontinuous
// versions.
//
// If a timeout happens, it means that the rowset
// that are being publised exists on a few replicas we should go
// ahead, otherwise data may be lost and thre
// publish task hangs forever.
logs.add(String.format("publish version timeout succ for transaction %s on tablet %s "
+ "with version %s, and has failed replicas, load require replica num %s. "
+ "table %s, partition %s, tablet detail: %s",
transactionState, tablet.getId(), newVersion,
loadRequiredReplicaNum, tableId, partitionId, writeDetail));
} else {
publishResult = PublishResult.FAILED;
String errMsg = String.format("publish on tablet %d failed."
+ " succeed replica num %d < load required replica num %d."
+ " table: %d, partition: %d, publish version: %d",
tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId,
partitionId, newVersion);
transactionState.setErrorMsg(errMsg);
logs.add(String.format("publish version failed for transaction %s on tablet %s with version"
+ " %s, and has failed replicas, load required replica num %s. table %s, "
+ "partition %s, tablet detail: %s",
transactionState, tablet.getId(), newVersion,
loadRequiredReplicaNum, tableId, partitionId, writeDetail));
}
}
}
}

boolean needLog = publishResult != PublishResult.FAILED
|| now - transactionState.getLastPublishLogTime() > Config.publish_fail_log_interval_second * 1000L;
if (true) {
transactionState.setLastPublishLogTime(now);
for (String log : logs) {
LOG.info("{}. publish times {}, whole txn publish result {}",
log, transactionState.getPublishCount(), publishResult.name());
}
}

return publishResult;
}

private boolean isAlterReplicaLoadedTxn(long transactionId, OlapTable table) {
List<AlterJobV2> unfinishedAlterJobs = null;
if (table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) {
Expand Down

0 comments on commit 2780e82

Please sign in to comment.