Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-2.1](insert-overwrite) Fix insert overwrite auto detect transaction safe (#38103) #38442

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1108,10 +1108,13 @@ public Set<String> getPartitionNames() {
return Sets.newHashSet(nameToPartition.keySet());
}

public List<String> uncheckedGetPartNamesById(List<Long> partitionIds) {
// for those elements equal in partiton ids, get their names.
public List<String> getEqualPartitionNames(List<Long> partitionIds1, List<Long> partitionIds2) {
List<String> names = new ArrayList<String>();
for (Long id : partitionIds) {
names.add(idToPartition.get(id).getName());
for (int i = 0; i < partitionIds1.size(); i++) {
if (partitionIds1.get(i).equals(partitionIds2.get(i))) {
names.add(getPartition(partitionIds1.get(i)).getName());
}
}
return names;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
// but we only change one time and save the relations in partitionPairs. they're protected by taskLocks
@SerializedName(value = "taskLocks")
private Map<Long, ReentrantLock> taskLocks = Maps.newConcurrentMap();
// <groupId, <oldPartId, newPartId>>
// <groupId, <oldPartId, newPartId>>. no need concern which task it belongs to.
@SerializedName(value = "partitionPairs")
private Map<Long, Map<Long, Long>> partitionPairs = Maps.newConcurrentMap();

Expand Down Expand Up @@ -91,7 +91,7 @@ public long registerTask(long dbId, long tableId, List<String> tempPartitionName
*
* @return group id, like a transaction id.
*/
public long preRegisterTask() {
public long registerTaskGroup() {
long groupId = Env.getCurrentEnv().getNextId();
taskGroups.put(groupId, new ArrayList<Long>());
taskLocks.put(groupId, new ReentrantLock());
Expand All @@ -107,44 +107,81 @@ public void registerTaskInGroup(long groupId, long taskId) {
taskGroups.get(groupId).add(taskId);
}

public List<Long> tryReplacePartitionIds(long groupId, List<Long> oldPartitionIds) {
/**
* this func should in lock scope of getLock(groupId)
*
* @param newIds if have replaced, replace with new. otherwise itself.
*/
public boolean tryReplacePartitionIds(long groupId, List<Long> oldPartitionIds, List<Long> newIds) {
Map<Long, Long> relations = partitionPairs.get(groupId);
List<Long> newIds = new ArrayList<Long>();
for (Long id : oldPartitionIds) {
boolean needReplace = false;
for (int i = 0; i < oldPartitionIds.size(); i++) {
long id = oldPartitionIds.get(i);
if (relations.containsKey(id)) {
// if we replaced it. then return new one.
newIds.add(relations.get(id));
} else {
// otherwise itself. we will deal it soon.
newIds.add(id);
needReplace = true;
}
}
return newIds;
return needReplace;
}

// this func should in lock scope of getLock(groupId)
public void recordPartitionPairs(long groupId, List<Long> oldIds, List<Long> newIds) {
Map<Long, Long> relations = partitionPairs.get(groupId);
Preconditions.checkArgument(oldIds.size() == newIds.size());
for (int i = 0; i < oldIds.size(); i++) {
relations.put(oldIds.get(i), newIds.get(i));
if (LOG.isDebugEnabled()) {
LOG.debug("recorded partition pairs: [" + oldIds.get(i) + ", " + newIds.get(i) + "]");
}
}
}

// lock is a symbol of TaskGroup exist. if not, means already failed.
public ReentrantLock getLock(long groupId) {
return taskLocks.get(groupId);
}

// When goes into failure, some BE may still not know and send new request.
// it will cause ConcurrentModification or NullPointer.
public void taskGroupFail(long groupId) {
LOG.info("insert overwrite auto detect partition task group [" + groupId + "] failed");
for (Long taskId : taskGroups.get(groupId)) {
taskFail(taskId);
ReentrantLock lock = getLock(groupId);
lock.lock();
try {
// will rollback temp partitions in `taskFail`
for (Long taskId : taskGroups.get(groupId)) {
taskFail(taskId);
}
cleanTaskGroup(groupId);
} finally {
lock.unlock();
}
cleanTaskGroup(groupId);
}

public void taskGroupSuccess(long groupId) {
// here we will make all raplacement of this group visiable. if someone fails, nothing happen.
public void taskGroupSuccess(long groupId, OlapTable targetTable) throws DdlException {
try {
Map<Long, Long> relations = partitionPairs.get(groupId);
ArrayList<String> oldNames = new ArrayList<>();
ArrayList<String> newNames = new ArrayList<>();
for (Entry<Long, Long> partitionPair : relations.entrySet()) {
oldNames.add(targetTable.getPartition(partitionPair.getKey()).getName());
newNames.add(targetTable.getPartition(partitionPair.getValue()).getName());
}
InsertOverwriteUtil.replacePartition(targetTable, oldNames, newNames);
} catch (Exception e) {
LOG.warn("insert overwrite task making replacement failed because " + e.getMessage()
+ "all new partition will not be visible and will be recycled by partition GC.");
throw e;
}
LOG.info("insert overwrite auto detect partition task group [" + groupId + "] succeed");
for (Long taskId : taskGroups.get(groupId)) {
Env.getCurrentEnv().getEditLog()
.logInsertOverwrite(new InsertOverwriteLog(taskId, tasks.get(taskId), InsertOverwriteOpType.ADD));
taskSuccess(taskId);
}
cleanTaskGroup(groupId);
Expand All @@ -164,6 +201,9 @@ private void cleanTaskGroup(long groupId) {
public void taskFail(long taskId) {
LOG.info("insert overwrite task [" + taskId + "] failed");
boolean rollback = rollback(taskId);
if (!rollback) {
LOG.warn("roll back task [" + taskId + "] failed");
}
if (rollback) {
removeTask(taskId);
} else {
Expand Down Expand Up @@ -192,6 +232,7 @@ public void allTaskFail() {
}
}

// cancel it. should try to remove them after.
private void cancelTask(long taskId) {
if (tasks.containsKey(taskId)) {
LOG.info("cancel insert overwrite task: {}", tasks.get(taskId));
Expand All @@ -201,6 +242,7 @@ private void cancelTask(long taskId) {
}
}

// task and partitions has been removed. it's safe to remove task.
private void removeTask(long taskId) {
if (tasks.containsKey(taskId)) {
LOG.info("remove insert overwrite task: {}", tasks.get(taskId));
Expand All @@ -222,7 +264,7 @@ private boolean rollback(long taskId) {
try {
olapTable = task.getTable();
} catch (DdlException e) {
LOG.warn("can not get table, task: {}", task);
LOG.warn("can not get table, task: {}, reason: {}", task, e.getMessage());
return true;
}
return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static void addTempPartitions(TableIf tableIf, List<String> partitionName
for (int i = 0; i < partitionNames.size(); i++) {
Env.getCurrentEnv().addPartitionLike((Database) tableIf.getDatabase(), tableIf.getName(),
new AddPartitionLikeClause(tempPartitionNames.get(i), partitionNames.get(i), true));
LOG.info("successfully add temp partition [{}] for [{}]", tempPartitionNames.get(i), tableIf.getName());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,12 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks made and registered in rpc process.
taskId = Env.getCurrentEnv().getInsertOverwriteManager().preRegisterTask();
// When inserting, BE will call to replace partition by FrontendService. FE do the real
// add&replacement and return replace result. So there's no need to do anything else.
taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
// When inserting, BE will call to replace partition by FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement.
insertInto(ctx, executor, taskId);
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId);
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable);
} else {
List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
taskId = Env.getCurrentEnv().getInsertOverwriteManager()
Expand All @@ -184,7 +185,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
}
} catch (Exception e) {
LOG.warn("insert into overwrite failed");
LOG.warn("insert into overwrite failed with task(or group) id " + taskId);
if (isAutoDetectOverwrite()) {
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

// Frontend service used to serve all request for this frontend through
// thrift protocol
Expand Down Expand Up @@ -3568,7 +3567,7 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
LOG.info("Receive replace partition request: {}", request);
long dbId = request.getDbId();
long tableId = request.getTableId();
List<Long> partitionIds = request.getPartitionIds();
List<Long> reqPartitionIds = request.getPartitionIds();
long taskGroupId = request.getOverwriteGroupId();
TReplacePartitionResult result = new TReplacePartitionResult();
TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
Expand Down Expand Up @@ -3607,41 +3606,60 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
OlapTable olapTable = (OlapTable) table;
InsertOverwriteManager overwriteManager = Env.getCurrentEnv().getInsertOverwriteManager();
ReentrantLock taskLock = overwriteManager.getLock(taskGroupId);
List<String> allReqPartNames; // all request partitions
if (taskLock == null) {
errorStatus.setErrorMsgs(Lists
.newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed.")));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}

ArrayList<Long> resultPartitionIds = new ArrayList<>(); // [1 2 5 6] -> [7 8 5 6]
ArrayList<Long> pendingPartitionIds = new ArrayList<>(); // pending: [1 2]
ArrayList<Long> newPartitionIds = new ArrayList<>(); // requested temp partition ids. for [7 8]
boolean needReplace = false;
try {
taskLock.lock();
// double check lock. maybe taskLock is not null, but has been removed from the Map. means the task failed.
if (overwriteManager.getLock(taskGroupId) == null) {
errorStatus.setErrorMsgs(Lists
.newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed.")));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}

// we dont lock the table. other thread in this txn will be controled by taskLock.
// if we have already replaced. dont do it again, but acquire the recorded new partition directly.
// if we have already replaced, dont do it again, but acquire the recorded new partition directly.
// if not by this txn, just let it fail naturally is ok.
List<Long> replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
// here if replacedPartIds still have null. this will throw exception.
allReqPartNames = olapTable.uncheckedGetPartNamesById(replacedPartIds);

List<Long> pendingPartitionIds = IntStream.range(0, partitionIds.size())
.filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced
.mapToObj(partitionIds::get)
.collect(Collectors.toList());
// from here we ONLY deal the pending partitions. not include the dealed(by others).
if (!pendingPartitionIds.isEmpty()) {
// below two must have same order inner.
List<String> pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
List<String> tempPartitionNames = InsertOverwriteUtil
.generateTempPartitionNames(pendingPartitionNames);
needReplace = overwriteManager.tryReplacePartitionIds(taskGroupId, reqPartitionIds, resultPartitionIds);
// request: [1 2 3 4] result: [1 2 5 6] means ONLY 1 and 2 need replace.
if (needReplace) {
// names for [1 2]
List<String> pendingPartitionNames = olapTable.getEqualPartitionNames(reqPartitionIds,
resultPartitionIds);
for (String name : pendingPartitionNames) {
pendingPartitionIds.add(olapTable.getPartition(name).getId()); // put [1 2]
}

long taskId = overwriteManager.registerTask(dbId, tableId, tempPartitionNames);
// names for [7 8]
List<String> newTempNames = InsertOverwriteUtil
.generateTempPartitionNames(pendingPartitionNames);
// a task means one time insert overwrite
long taskId = overwriteManager.registerTask(dbId, tableId, newTempNames);
overwriteManager.registerTaskInGroup(taskGroupId, taskId);
InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames);
InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames);
InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, newTempNames);
// now temp partitions are bumped up and use new names. we get their ids and record them.
List<Long> newPartitionIds = new ArrayList<Long>();
for (String newPartName : pendingPartitionNames) {
newPartitionIds.add(olapTable.getPartition(newPartName).getId());
for (String newPartName : newTempNames) {
newPartitionIds.add(olapTable.getPartition(newPartName).getId()); // put [7 8]
}
overwriteManager.recordPartitionPairs(taskGroupId, pendingPartitionIds, newPartitionIds);

if (LOG.isDebugEnabled()) {
LOG.debug("partition replacement: ");
for (int i = 0; i < pendingPartitionIds.size(); i++) {
LOG.debug("[" + pendingPartitionIds.get(i) + ", " + newPartitionIds.get(i) + "], ");
LOG.debug("[" + pendingPartitionIds.get(i) + " - " + pendingPartitionNames.get(i) + ", "
+ newPartitionIds.get(i) + " - " + newTempNames.get(i) + "], ");
}
}
}
Expand All @@ -3654,15 +3672,38 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
taskLock.unlock();
}

// build partition & tablets. now all partitions in allReqPartNames are replaced
// an recorded.
// so they won't be changed again. if other transaction changing it. just let it
// fail.
List<TOlapTablePartition> partitions = Lists.newArrayList();
List<TTabletLocation> tablets = Lists.newArrayList();
// result: [1 2 5 6], make it [7 8 5 6]
int idx = 0;
if (needReplace) {
for (int i = 0; i < reqPartitionIds.size(); i++) {
if (reqPartitionIds.get(i).equals(resultPartitionIds.get(i))) {
resultPartitionIds.set(i, newPartitionIds.get(idx++));
}
}
}
if (idx != newPartitionIds.size()) {
errorStatus.addToErrorMsgs("changed partition number " + idx + " is not correct");
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}

if (LOG.isDebugEnabled()) {
LOG.debug("replace partition origin ids: ["
+ String.join(", ", reqPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+ ']');
LOG.debug("replace partition result ids: ["
+ String.join(", ", resultPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+ ']');
}

// build partition & tablets. now all partitions in allReqPartNames are replaced an recorded.
// so they won't be changed again. if other transaction changing it. just let it fail.
List<TOlapTablePartition> partitions = new ArrayList<>();
List<TTabletLocation> tablets = new ArrayList<>();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
for (String partitionName : allReqPartNames) {
Partition partition = table.getPartition(partitionName);
for (long partitionId : resultPartitionIds) {
Partition partition = olapTable.getPartition(partitionId);
TOlapTablePartition tPartition = new TOlapTablePartition();
tPartition.setId(partition.getId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ suite("test_iot_auto_detect_concurrent") {
thread5.join()
// suppose result: success zero or one
if (success_status) { // success zero
log.info("test 1: success zero")
result = sql " select count(k0) from test_concurrent_write; "
assertEquals(result[0][0], 1000)
result = sql " select count(distinct k0) from test_concurrent_write; "
assertEquals(result[0][0], 1000)
} else { // success one
log.info("test 1: success one")
result = sql " select count(k0) from test_concurrent_write; "
assertEquals(result[0][0], 100)
result = sql " select count(distinct k0) from test_concurrent_write; "
Expand Down
Loading