Skip to content

Commit

Permalink
[Fix](insert-overwrite) Fix insert overwrite auto detect transaction …
Browse files Browse the repository at this point in the history
…safe (#38103)

Before, if insert overwrite auto detect failed because of some
transaction conflicts, it will go into an unexpected situation with some
of the partition replace success but some retains.
Now it will wholly success, or wholly failed.
  • Loading branch information
zclllyybb authored Jul 25, 2024
1 parent 00fca3e commit 1c18b4c
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1216,10 +1216,13 @@ public Set<String> getPartitionNames() {
return Sets.newHashSet(nameToPartition.keySet());
}

public List<String> uncheckedGetPartNamesById(List<Long> partitionIds) {
// for those elements equal in partiton ids, get their names.
public List<String> getEqualPartitionNames(List<Long> partitionIds1, List<Long> partitionIds2) {
List<String> names = new ArrayList<String>();
for (Long id : partitionIds) {
names.add(idToPartition.get(id).getName());
for (int i = 0; i < partitionIds1.size(); i++) {
if (partitionIds1.get(i).equals(partitionIds2.get(i))) {
names.add(getPartition(partitionIds1.get(i)).getName());
}
}
return names;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
// but we only change one time and save the relations in partitionPairs. they're protected by taskLocks
@SerializedName(value = "taskLocks")
private Map<Long, ReentrantLock> taskLocks = Maps.newConcurrentMap();
// <groupId, <oldPartId, newPartId>>
// <groupId, <oldPartId, newPartId>>. no need concern which task it belongs to.
@SerializedName(value = "partitionPairs")
private Map<Long, Map<Long, Long>> partitionPairs = Maps.newConcurrentMap();

Expand Down Expand Up @@ -91,7 +91,7 @@ public long registerTask(long dbId, long tableId, List<String> tempPartitionName
*
* @return group id, like a transaction id.
*/
public long preRegisterTask() {
public long registerTaskGroup() {
long groupId = Env.getCurrentEnv().getNextId();
taskGroups.put(groupId, new ArrayList<Long>());
taskLocks.put(groupId, new ReentrantLock());
Expand All @@ -107,44 +107,81 @@ public void registerTaskInGroup(long groupId, long taskId) {
taskGroups.get(groupId).add(taskId);
}

public List<Long> tryReplacePartitionIds(long groupId, List<Long> oldPartitionIds) {
/**
* this func should in lock scope of getLock(groupId)
*
* @param newIds if have replaced, replace with new. otherwise itself.
*/
public boolean tryReplacePartitionIds(long groupId, List<Long> oldPartitionIds, List<Long> newIds) {
Map<Long, Long> relations = partitionPairs.get(groupId);
List<Long> newIds = new ArrayList<Long>();
for (Long id : oldPartitionIds) {
boolean needReplace = false;
for (int i = 0; i < oldPartitionIds.size(); i++) {
long id = oldPartitionIds.get(i);
if (relations.containsKey(id)) {
// if we replaced it. then return new one.
newIds.add(relations.get(id));
} else {
// otherwise itself. we will deal it soon.
newIds.add(id);
needReplace = true;
}
}
return newIds;
return needReplace;
}

// this func should in lock scope of getLock(groupId)
public void recordPartitionPairs(long groupId, List<Long> oldIds, List<Long> newIds) {
Map<Long, Long> relations = partitionPairs.get(groupId);
Preconditions.checkArgument(oldIds.size() == newIds.size());
for (int i = 0; i < oldIds.size(); i++) {
relations.put(oldIds.get(i), newIds.get(i));
if (LOG.isDebugEnabled()) {
LOG.debug("recorded partition pairs: [" + oldIds.get(i) + ", " + newIds.get(i) + "]");
}
}
}

// lock is a symbol of TaskGroup exist. if not, means already failed.
public ReentrantLock getLock(long groupId) {
return taskLocks.get(groupId);
}

// When goes into failure, some BE may still not know and send new request.
// it will cause ConcurrentModification or NullPointer.
public void taskGroupFail(long groupId) {
LOG.info("insert overwrite auto detect partition task group [" + groupId + "] failed");
for (Long taskId : taskGroups.get(groupId)) {
taskFail(taskId);
ReentrantLock lock = getLock(groupId);
lock.lock();
try {
// will rollback temp partitions in `taskFail`
for (Long taskId : taskGroups.get(groupId)) {
taskFail(taskId);
}
cleanTaskGroup(groupId);
} finally {
lock.unlock();
}
cleanTaskGroup(groupId);
}

public void taskGroupSuccess(long groupId) {
// here we will make all raplacement of this group visiable. if someone fails, nothing happen.
public void taskGroupSuccess(long groupId, OlapTable targetTable) throws DdlException {
try {
Map<Long, Long> relations = partitionPairs.get(groupId);
ArrayList<String> oldNames = new ArrayList<>();
ArrayList<String> newNames = new ArrayList<>();
for (Entry<Long, Long> partitionPair : relations.entrySet()) {
oldNames.add(targetTable.getPartition(partitionPair.getKey()).getName());
newNames.add(targetTable.getPartition(partitionPair.getValue()).getName());
}
InsertOverwriteUtil.replacePartition(targetTable, oldNames, newNames);
} catch (Exception e) {
LOG.warn("insert overwrite task making replacement failed because " + e.getMessage()
+ "all new partition will not be visible and will be recycled by partition GC.");
throw e;
}
LOG.info("insert overwrite auto detect partition task group [" + groupId + "] succeed");
for (Long taskId : taskGroups.get(groupId)) {
Env.getCurrentEnv().getEditLog()
.logInsertOverwrite(new InsertOverwriteLog(taskId, tasks.get(taskId), InsertOverwriteOpType.ADD));
taskSuccess(taskId);
}
cleanTaskGroup(groupId);
Expand All @@ -164,6 +201,9 @@ private void cleanTaskGroup(long groupId) {
public void taskFail(long taskId) {
LOG.info("insert overwrite task [" + taskId + "] failed");
boolean rollback = rollback(taskId);
if (!rollback) {
LOG.warn("roll back task [" + taskId + "] failed");
}
if (rollback) {
removeTask(taskId);
} else {
Expand Down Expand Up @@ -192,6 +232,7 @@ public void allTaskFail() {
}
}

// cancel it. should try to remove them after.
private void cancelTask(long taskId) {
if (tasks.containsKey(taskId)) {
LOG.info("cancel insert overwrite task: {}", tasks.get(taskId));
Expand All @@ -201,6 +242,7 @@ private void cancelTask(long taskId) {
}
}

// task and partitions has been removed. it's safe to remove task.
private void removeTask(long taskId) {
if (tasks.containsKey(taskId)) {
LOG.info("remove insert overwrite task: {}", tasks.get(taskId));
Expand All @@ -222,7 +264,7 @@ private boolean rollback(long taskId) {
try {
olapTable = task.getTable();
} catch (DdlException e) {
LOG.warn("can not get table, task: {}", task);
LOG.warn("can not get table, task: {}, reason: {}", task, e.getMessage());
return true;
}
return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static void addTempPartitions(TableIf tableIf, List<String> partitionName
for (int i = 0; i < partitionNames.size(); i++) {
Env.getCurrentEnv().addPartitionLike((Database) tableIf.getDatabase(), tableIf.getName(),
new AddPartitionLikeClause(tempPartitionNames.get(i), partitionNames.get(i), true));
LOG.info("successfully add temp partition [{}] for [{}]", tempPartitionNames.get(i), tableIf.getName());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,12 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks made and registered in rpc process.
taskId = Env.getCurrentEnv().getInsertOverwriteManager().preRegisterTask();
// When inserting, BE will call to replace partition by FrontendService. FE do the real
// add&replacement and return replace result. So there's no need to do anything else.
taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
// When inserting, BE will call to replace partition by FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement.
insertInto(ctx, executor, taskId);
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId);
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable);
} else {
List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
taskId = Env.getCurrentEnv().getInsertOverwriteManager()
Expand All @@ -184,7 +185,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
}
} catch (Exception e) {
LOG.warn("insert into overwrite failed");
LOG.warn("insert into overwrite failed with task(or group) id " + taskId);
if (isAutoDetectOverwrite()) {
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

// Frontend service used to serve all request for this frontend through
// thrift protocol
Expand Down Expand Up @@ -3483,7 +3482,7 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
LOG.info("Receive replace partition request: {}", request);
long dbId = request.getDbId();
long tableId = request.getTableId();
List<Long> partitionIds = request.getPartitionIds();
List<Long> reqPartitionIds = request.getPartitionIds();
long taskGroupId = request.getOverwriteGroupId();
TReplacePartitionResult result = new TReplacePartitionResult();
TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
Expand Down Expand Up @@ -3522,41 +3521,60 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
OlapTable olapTable = (OlapTable) table;
InsertOverwriteManager overwriteManager = Env.getCurrentEnv().getInsertOverwriteManager();
ReentrantLock taskLock = overwriteManager.getLock(taskGroupId);
List<String> allReqPartNames; // all request partitions
if (taskLock == null) {
errorStatus.setErrorMsgs(Lists
.newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed.")));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}

ArrayList<Long> resultPartitionIds = new ArrayList<>(); // [1 2 5 6] -> [7 8 5 6]
ArrayList<Long> pendingPartitionIds = new ArrayList<>(); // pending: [1 2]
ArrayList<Long> newPartitionIds = new ArrayList<>(); // requested temp partition ids. for [7 8]
boolean needReplace = false;
try {
taskLock.lock();
// double check lock. maybe taskLock is not null, but has been removed from the Map. means the task failed.
if (overwriteManager.getLock(taskGroupId) == null) {
errorStatus.setErrorMsgs(Lists
.newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed.")));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}

// we dont lock the table. other thread in this txn will be controled by taskLock.
// if we have already replaced. dont do it again, but acquire the recorded new partition directly.
// if we have already replaced, dont do it again, but acquire the recorded new partition directly.
// if not by this txn, just let it fail naturally is ok.
List<Long> replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
// here if replacedPartIds still have null. this will throw exception.
allReqPartNames = olapTable.uncheckedGetPartNamesById(replacedPartIds);

List<Long> pendingPartitionIds = IntStream.range(0, partitionIds.size())
.filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced
.mapToObj(partitionIds::get)
.collect(Collectors.toList());
// from here we ONLY deal the pending partitions. not include the dealed(by others).
if (!pendingPartitionIds.isEmpty()) {
// below two must have same order inner.
List<String> pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
List<String> tempPartitionNames = InsertOverwriteUtil
.generateTempPartitionNames(pendingPartitionNames);
needReplace = overwriteManager.tryReplacePartitionIds(taskGroupId, reqPartitionIds, resultPartitionIds);
// request: [1 2 3 4] result: [1 2 5 6] means ONLY 1 and 2 need replace.
if (needReplace) {
// names for [1 2]
List<String> pendingPartitionNames = olapTable.getEqualPartitionNames(reqPartitionIds,
resultPartitionIds);
for (String name : pendingPartitionNames) {
pendingPartitionIds.add(olapTable.getPartition(name).getId()); // put [1 2]
}

long taskId = overwriteManager.registerTask(dbId, tableId, tempPartitionNames);
// names for [7 8]
List<String> newTempNames = InsertOverwriteUtil
.generateTempPartitionNames(pendingPartitionNames);
// a task means one time insert overwrite
long taskId = overwriteManager.registerTask(dbId, tableId, newTempNames);
overwriteManager.registerTaskInGroup(taskGroupId, taskId);
InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames);
InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames);
InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, newTempNames);
// now temp partitions are bumped up and use new names. we get their ids and record them.
List<Long> newPartitionIds = new ArrayList<Long>();
for (String newPartName : pendingPartitionNames) {
newPartitionIds.add(olapTable.getPartition(newPartName).getId());
for (String newPartName : newTempNames) {
newPartitionIds.add(olapTable.getPartition(newPartName).getId()); // put [7 8]
}
overwriteManager.recordPartitionPairs(taskGroupId, pendingPartitionIds, newPartitionIds);

if (LOG.isDebugEnabled()) {
LOG.debug("partition replacement: ");
for (int i = 0; i < pendingPartitionIds.size(); i++) {
LOG.debug("[" + pendingPartitionIds.get(i) + ", " + newPartitionIds.get(i) + "], ");
LOG.debug("[" + pendingPartitionIds.get(i) + " - " + pendingPartitionNames.get(i) + ", "
+ newPartitionIds.get(i) + " - " + newTempNames.get(i) + "], ");
}
}
}
Expand All @@ -3569,15 +3587,38 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
taskLock.unlock();
}

// build partition & tablets. now all partitions in allReqPartNames are replaced
// an recorded.
// so they won't be changed again. if other transaction changing it. just let it
// fail.
List<TOlapTablePartition> partitions = Lists.newArrayList();
List<TTabletLocation> tablets = Lists.newArrayList();
// result: [1 2 5 6], make it [7 8 5 6]
int idx = 0;
if (needReplace) {
for (int i = 0; i < reqPartitionIds.size(); i++) {
if (reqPartitionIds.get(i).equals(resultPartitionIds.get(i))) {
resultPartitionIds.set(i, newPartitionIds.get(idx++));
}
}
}
if (idx != newPartitionIds.size()) {
errorStatus.addToErrorMsgs("changed partition number " + idx + " is not correct");
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}

if (LOG.isDebugEnabled()) {
LOG.debug("replace partition origin ids: ["
+ String.join(", ", reqPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+ ']');
LOG.debug("replace partition result ids: ["
+ String.join(", ", resultPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+ ']');
}

// build partition & tablets. now all partitions in allReqPartNames are replaced an recorded.
// so they won't be changed again. if other transaction changing it. just let it fail.
List<TOlapTablePartition> partitions = new ArrayList<>();
List<TTabletLocation> tablets = new ArrayList<>();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
for (String partitionName : allReqPartNames) {
Partition partition = table.getPartition(partitionName);
for (long partitionId : resultPartitionIds) {
Partition partition = olapTable.getPartition(partitionId);
TOlapTablePartition tPartition = new TOlapTablePartition();
tPartition.setId(partition.getId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th
"test_profile," +
"test_refresh_mtmv," +
"test_spark_load," +
"test_iot_auto_detect_concurrent," +
"zzz_the_end_sentinel_do_not_touch" // keep this line as the last line

excludeDirectories = "000_the_start_sentinel_do_not_touch," + // keep this line as the first line
Expand Down
1 change: 0 additions & 1 deletion regression-test/pipeline/p1/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th
"test_profile," +
"test_refresh_mtmv," +
"test_spark_load," +
"test_iot_auto_detect_concurrent," +
"zzz_the_end_sentinel_do_not_touch" // keep this line as the last line

// this dir will not be executed
Expand Down
Loading

0 comments on commit 1c18b4c

Please sign in to comment.