Skip to content

Commit

Permalink
retry rename table in interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
krihy committed Sep 11, 2023
1 parent 17825e4 commit 19343d1
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,6 @@ public void test_osc_swap_table_origin_table_drop_successful() {
this::checkSwapTableAndRenameDrop);
}

@Test
public void test_osc_swap_table_failed_drop_new_table() {

String originTableName = getOriginTableName();
doThrow(new BadArgumentException(ErrorCodes.BadArgument, "bad argument"))
.when(dbSessionManager)
.killAllSessions(Mockito.any(ConnectionSession.class), Mockito.any(Predicate.class), Mockito.anyInt());

executeOscSwapTable(
originTableName,
c -> c.setOriginTableCleanStrategy(OriginTableCleanStrategy.ORIGIN_TABLE_DROP),
this::checkSwapTableFailedAndDropNewTable);
}

private void executeOscSwapTable(String originTableName,
Consumer<OnlineSchemaChangeParameters> changeParametersConsumer,
Consumer<OnlineSchemaChangeScheduleTaskParameters> resultAssert) {
Expand Down Expand Up @@ -185,29 +171,5 @@ private void checkSwapTableAndRenameDrop(OnlineSchemaChangeScheduleTaskParameter
Assert.assertFalse(CollectionUtils.isEmpty(originTable));
}

private void checkSwapTableFailedAndDropNewTable(OnlineSchemaChangeScheduleTaskParameters taskParameters) {
DBSchemaAccessor dbSchemaAccessor = DBSchemaAccessors.create(connectionSession);
List<String> renamedTable = dbSchemaAccessor.showTablesLike(taskParameters.getDatabaseName(),
taskParameters.getRenamedTableName());

List<String> originTable = dbSchemaAccessor.showTablesLike(taskParameters.getDatabaseName(),
taskParameters.getOriginTableNameUnwrapped());

List<String> newTable = dbSchemaAccessor.showTablesLike(taskParameters.getDatabaseName(),
taskParameters.getNewTableNameUnwrapped());

Assert.assertTrue(CollectionUtils.isEmpty(renamedTable));
Assert.assertFalse(CollectionUtils.isEmpty(originTable));
Assert.assertTrue(CollectionUtils.isEmpty(newTable));

List<DBTableColumn> tableColumnFromNew = dbSchemaAccessor.listTableColumns(taskParameters.getDatabaseName(),
taskParameters.getOriginTableNameUnwrapped());

Optional<DBTableColumn> name1Col = tableColumnFromNew.stream()
.filter(a -> a.getName().equalsIgnoreCase("name1"))
.findFirst();
Assert.assertTrue(name1Col.isPresent());
Assert.assertEquals(20L, name1Col.get().getMaxLength().longValue());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import com.oceanbase.odc.core.session.ConnectionSession;
import com.oceanbase.odc.core.session.ConnectionSessionConstants;
import com.oceanbase.odc.core.shared.constant.ErrorCodes;
import com.oceanbase.odc.service.onlineschemachange.exception.OscException;
import com.oceanbase.odc.service.onlineschemachange.model.OnlineSchemaChangeParameters;
import com.oceanbase.odc.service.onlineschemachange.model.OnlineSchemaChangeScheduleTaskParameters;
import com.oceanbase.odc.service.onlineschemachange.model.OriginTableCleanStrategy;
import com.oceanbase.odc.service.session.DBSessionManageFacade;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -55,9 +53,6 @@ public DefaultRenameTableInvoker(ConnectionSession connSession,
RenameTableInterceptor lockInterceptor =
lockRenameTableFactory.generate(connSession, dbSessionManageFacade);
interceptors.add(lockInterceptor);
HandlerTableInterceptor handlerTableInterceptor =
new HandlerTableInterceptor(connSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY));
interceptors.add(handlerTableInterceptor);
interceptors.add(new ForeignKeyInterceptor(connSession));
this.interceptors = interceptors;
this.connectionSession = connSession;
Expand All @@ -70,38 +65,27 @@ public void invoke(OnlineSchemaChangeScheduleTaskParameters taskParameters,
OnlineSchemaChangeParameters parameters) {
RenameTableParameters renameTableParameters = getRenameTableParameters(taskParameters, parameters);
try {
preRename(renameTableParameters);
retryRename(taskParameters, parameters);
renameSucceed(renameTableParameters);
} catch (Exception ex) {
renameFailed(renameTableParameters);
throw new OscException(ErrorCodes.Unexpected, "rename table occur error", ex);
retryRename(taskParameters, parameters, renameTableParameters);
dropOldTable(renameTableParameters);
} finally {
try {
postRenamed(renameTableParameters);
} catch (Throwable throwable) {
// ignore
}
cleanUp(taskParameters);
renameBackHandler.renameBack(connectionSession, taskParameters);
}
}

private void retryRename(OnlineSchemaChangeScheduleTaskParameters taskParameters,
OnlineSchemaChangeParameters parameters) {
OnlineSchemaChangeParameters parameters, RenameTableParameters renameTableParameters) {

Integer swapTableNameRetryTimes = parameters.getSwapTableNameRetryTimes();
if (swapTableNameRetryTimes == 0) {
swapTableNameRetryTimes = 1;
}

AtomicInteger retryTime = new AtomicInteger();
boolean succeed = false;
while (retryTime.getAndIncrement() < swapTableNameRetryTimes) {
if (succeed) {
break;
}
succeed = doTryRename(taskParameters, retryTime);
}
boolean succeed;
do {
succeed = doTryRename(taskParameters, renameTableParameters, retryTime);
} while (retryTime.incrementAndGet() < swapTableNameRetryTimes && !succeed);

if (!succeed) {
throw new IllegalStateException(
MessageFormat.format("Swap table name failed after {0} times", retryTime.get()));
Expand All @@ -110,20 +94,31 @@ private void retryRename(OnlineSchemaChangeScheduleTaskParameters taskParameters
}
}

private boolean doTryRename(OnlineSchemaChangeScheduleTaskParameters taskParameters, AtomicInteger retryTime) {
AtomicBoolean atomicResult = new AtomicBoolean(false);
private boolean doTryRename(OnlineSchemaChangeScheduleTaskParameters taskParameters,
RenameTableParameters renameTableParameters, AtomicInteger retryTime) {
boolean succeed = false;
try {
preRename(renameTableParameters);
renameTableHandler.rename(taskParameters.getDatabaseName(), taskParameters.getOriginTableName(),
taskParameters.getRenamedTableName(), taskParameters.getNewTableName());
atomicResult.getAndSet(true);
succeed = true;
renameSucceed(renameTableParameters);
} catch (Exception e) {
log.warn(MessageFormat.format("Swap table name occur error, retry time {0}",
retryTime.get()), e);
renameBackHandler.renameBack(connectionSession, taskParameters);
renameFailed(renameTableParameters);
} finally {
try {
postRenamed(renameTableParameters);
} catch (Throwable throwable) {
// ignore
}
}
return atomicResult.get();
return succeed;
}


private RenameTableParameters getRenameTableParameters(OnlineSchemaChangeScheduleTaskParameters taskParameters,
OnlineSchemaChangeParameters parameters) {
// set lock table max timeout is 120s
Expand Down Expand Up @@ -164,8 +159,28 @@ private void reverseConsumerInterceptor(Consumer<RenameTableInterceptor> interce
}
}

private void cleanUp(OnlineSchemaChangeScheduleTaskParameters taskParameters) {
renameBackHandler.renameBack(connectionSession, taskParameters);
private void dropOldTable(RenameTableParameters parameters) {
if (parameters.getOriginTableCleanStrategy() == OriginTableCleanStrategy.ORIGIN_TABLE_DROP) {
log.info("Because origin table clean strategy is {}, so we drop the old table. ",
parameters.getOriginTableCleanStrategy());
String oldTable = getWithSchema(parameters.getSchemaName(), parameters.getRenamedTableName());
dropTable(oldTable);
}
}

private void dropTable(String tableName) {
try {
// drop table
connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY)
.execute("DROP TABLE " + tableName);
log.info("DROP TABLE {}", tableName);
} catch (Exception exception) {
log.warn("Drop old table {} occur error {} ", tableName, exception.getMessage());
}
}

private String getWithSchema(String schema, String tableName) {
return schema + "." + tableName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public RenameTableInterceptor generate(ConnectionSession connectionSession,
PreConditions.notNull(obVersion, "obVersion");

if (connectType == ConnectType.OB_MYSQL || connectType == ConnectType.CLOUD_OB_MYSQL) {
if (VersionUtils.isGreaterThanOrEqualsTo(obVersion, "4.2.0")) {
if (VersionUtils.isGreaterThanOrEqualsTo(obVersion, "4.3.0")) {
// OB 版本 >= 4.2.0
return new LockTableInterceptor();
} else {
Expand Down

0 comments on commit 19343d1

Please sign in to comment.