Skip to content

Commit

Permalink
Modify local transactions to be executed in parallel (#30462)
Browse files Browse the repository at this point in the history
* Modify local transactions to be executed in parallel

* Fix force startup e2e

* Fix force startup e2e

* Fix force startup e2e
  • Loading branch information
FlyingZC authored Mar 12, 2024
1 parent 38a9d3c commit 605d3f4
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public final class JDBCExecutor {

private final ExecutorEngine executorEngine;

// TODO add transaction type to ConnectionContext
private final ConnectionContext connectionContext;

/**
Expand Down Expand Up @@ -64,7 +63,7 @@ public <T> List<T> execute(final ExecutionGroupContext<JDBCExecutionUnit> execut
public <T> List<T> execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final JDBCExecutorCallback<T> firstCallback, final JDBCExecutorCallback<T> callback) throws SQLException {
try {
return executorEngine.execute(executionGroupContext, firstCallback, callback, connectionContext.getTransactionContext().isInTransaction());
return executorEngine.execute(executionGroupContext, firstCallback, callback, connectionContext.getTransactionContext().isInXATransaction());
} catch (final SQLException ex) {
SQLExecutorExceptionHandler.handleException(ex);
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ public Optional<String> getTrafficInstanceId() {
/**
* Clear cursor connection context.
*/
public void clearCursorConnectionContext() {
public void clearCursorContext() {
cursorContext.close();
}

/**
* Clear transaction connection context.
*/
public void clearTransactionConnectionContext() {
public void clearTransactionContext() {
transactionContext.close();
}

Expand All @@ -112,7 +112,7 @@ public Optional<String> getDatabaseName() {
@Override
public void close() {
trafficInstanceId = null;
clearCursorConnectionContext();
clearTransactionConnectionContext();
clearCursorContext();
clearTransactionContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,37 @@
* Transaction connection context.
*/
@Getter
@Setter
public final class TransactionConnectionContext implements AutoCloseable {

private volatile String transactionType;

private volatile boolean inTransaction;

@Setter
private volatile long beginMills;

@Setter
private volatile String readWriteSplitReplicaRoute;

/**
* Begin transaction.
*
* @param transactionType transaction type
*/
public void beginTransaction(final String transactionType) {
this.transactionType = transactionType;
inTransaction = true;
}

/**
* Judge is in XA transaction or not.
*
* @return in XA transaction or not
*/
public boolean isInXATransaction() {
return inTransaction && "XA".equals(transactionType);
}

@Override
public void close() {
inTransaction = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void setAutoCommit(final boolean autoCommit) throws SQLException {
private void processLocalTransaction() throws SQLException {
databaseConnectionManager.setAutoCommit(autoCommit);
if (!autoCommit) {
getConnectionContext().getTransactionContext().setInTransaction(true);
getConnectionContext().getTransactionContext().beginTransaction(String.valueOf(databaseConnectionManager.getConnectionTransaction().getTransactionType()));
}
}

Expand All @@ -190,7 +190,7 @@ private void processDistributedTransaction() throws SQLException {
private void beginDistributedTransaction() throws SQLException {
databaseConnectionManager.close();
databaseConnectionManager.getConnectionTransaction().begin();
getConnectionContext().getTransactionContext().setInTransaction(true);
getConnectionContext().getTransactionContext().beginTransaction(String.valueOf(databaseConnectionManager.getConnectionTransaction().getTransactionType()));
}

/**
Expand All @@ -203,7 +203,7 @@ public void handleAutoCommit() throws SQLException {
if (TransactionType.isDistributedTransaction(databaseConnectionManager.getConnectionTransaction().getTransactionType())) {
beginDistributedTransaction();
} else {
getConnectionContext().getTransactionContext().setInTransaction(true);
getConnectionContext().getTransactionContext().beginTransaction(String.valueOf(databaseConnectionManager.getConnectionTransaction().getTransactionType()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private void prepareCursorStatementContext(final CursorAvailable statementContex
prepareCursorStatementContext(statementContext, connectionSession, cursorName);
}
if (statementContext instanceof CloseStatementContext && ((CloseStatementContext) statementContext).getSqlStatement().isCloseAll()) {
connectionSession.getConnectionContext().clearCursorConnectionContext();
connectionSession.getConnectionContext().clearCursorContext();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public BackendTransactionManager(final ProxyDatabaseConnectionManager databaseCo
public void begin() {
if (!connection.getConnectionSession().getTransactionStatus().isInTransaction()) {
connection.getConnectionSession().getTransactionStatus().setInTransaction(true);
getTransactionContext().setInTransaction(true);
getTransactionContext().beginTransaction(String.valueOf(transactionType));
connection.closeHandlers(true);
connection.closeConnections(false);
}
Expand Down Expand Up @@ -100,8 +100,8 @@ public void commit() throws SQLException {
}
connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
connection.getConnectionSession().getTransactionStatus().setExceptionOccur(false);
connection.getConnectionSession().getConnectionContext().clearTransactionConnectionContext();
connection.getConnectionSession().getConnectionContext().clearCursorConnectionContext();
connection.getConnectionSession().getConnectionContext().clearTransactionContext();
connection.getConnectionSession().getConnectionContext().clearCursorContext();
}
}
}
Expand All @@ -124,8 +124,8 @@ public void rollback() throws SQLException {
}
connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
connection.getConnectionSession().getTransactionStatus().setExceptionOccur(false);
connection.getConnectionSession().getConnectionContext().clearTransactionConnectionContext();
connection.getConnectionSession().getConnectionContext().clearCursorConnectionContext();
connection.getConnectionSession().getConnectionContext().clearTransactionContext();
connection.getConnectionSession().getConnectionContext().clearCursorContext();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ public ResponseHeader execute() throws SQLException {
private ResponseHeader begin() throws SQLException {
ShardingSpherePreconditions.checkState(!connectionSession.getTransactionStatus().isInTransaction(), XATransactionNestedBeginException::new);
ResponseHeader result = backendHandler.execute();
connectionSession.getConnectionContext().getTransactionContext().setInTransaction(true);
connectionSession.getConnectionContext().getTransactionContext().beginTransaction(String.valueOf(connectionSession.getTransactionStatus().getTransactionType()));
return result;
}

private ResponseHeader finish() throws SQLException {
try {
return backendHandler.execute();
} finally {
connectionSession.getConnectionContext().clearTransactionConnectionContext();
connectionSession.getConnectionContext().clearCursorConnectionContext();
connectionSession.getConnectionContext().clearTransactionContext();
connectionSession.getConnectionContext().clearCursorContext();
}
}
}

0 comments on commit 605d3f4

Please sign in to comment.