From 7fe6fcad63c0066efd9c81dbaf8c36bfaf7d0ae4 Mon Sep 17 00:00:00 2001 From: ZhangCheng Date: Tue, 12 Mar 2024 10:20:56 +0800 Subject: [PATCH 1/4] Modify local transactions to be executed in parallel --- .../execute/engine/driver/jdbc/JDBCExecutor.java | 3 +-- .../session/connection/ConnectionContext.java | 8 ++++---- .../transaction/TransactionConnectionContext.java | 14 +++++++++++++- .../core/connection/ShardingSphereConnection.java | 6 +++--- .../proxy/backend/connector/DatabaseConnector.java | 2 +- .../transaction/BackendTransactionManager.java | 11 ++++++----- .../handler/transaction/TransactionXAHandler.java | 9 ++++++--- 7 files changed, 34 insertions(+), 19 deletions(-) diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java index 4450d0915ef10..db3445419fc40 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java @@ -35,7 +35,6 @@ public final class JDBCExecutor { private final ExecutorEngine executorEngine; - // TODO add transaction type to ConnectionContext private final ConnectionContext connectionContext; /** @@ -64,7 +63,7 @@ public List execute(final ExecutionGroupContext execut public List execute(final ExecutionGroupContext executionGroupContext, final JDBCExecutorCallback firstCallback, final JDBCExecutorCallback callback) throws SQLException { try { - return executorEngine.execute(executionGroupContext, firstCallback, callback, connectionContext.getTransactionContext().isInTransaction()); + return executorEngine.execute(executionGroupContext, firstCallback, callback, connectionContext.getTransactionContext().isInXATransaction()); } catch (final SQLException ex) { SQLExecutorExceptionHandler.handleException(ex); return Collections.emptyList(); diff --git a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/ConnectionContext.java b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/ConnectionContext.java index 02692a8a04350..33d8a655347eb 100644 --- a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/ConnectionContext.java +++ b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/ConnectionContext.java @@ -78,14 +78,14 @@ public Optional getTrafficInstanceId() { /** * Clear cursor connection context. */ - public void clearCursorConnectionContext() { + public void clearCursorContext() { cursorContext.close(); } /** * Clear transaction connection context. */ - public void clearTransactionConnectionContext() { + public void clearTransactionContext() { transactionContext.close(); } @@ -112,7 +112,7 @@ public Optional getDatabaseName() { @Override public void close() { trafficInstanceId = null; - clearCursorConnectionContext(); - clearTransactionConnectionContext(); + clearCursorContext(); + clearTransactionContext(); } } diff --git a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java index ba1a98cbe4012..56cd4979430f0 100644 --- a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java +++ b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java @@ -24,15 +24,27 @@ * Transaction connection context. */ @Getter -@Setter public final class TransactionConnectionContext implements AutoCloseable { + private volatile String transactionType; + private volatile boolean inTransaction; + @Setter private volatile long beginMills; + @Setter private volatile String readWriteSplitReplicaRoute; + public void beginTransaction(final String transactionType) { + this.transactionType = transactionType; + inTransaction = true; + } + + public boolean isInXATransaction() { + return inTransaction && "XA".equals(transactionType); + } + @Override public void close() { inTransaction = false; diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java index 92b6d93853d6d..7f63560040f68 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java @@ -170,7 +170,7 @@ public void setAutoCommit(final boolean autoCommit) throws SQLException { private void processLocalTransaction() throws SQLException { databaseConnectionManager.setAutoCommit(autoCommit); if (!autoCommit) { - getConnectionContext().getTransactionContext().setInTransaction(true); + getConnectionContext().getTransactionContext().beginTransaction(String.valueOf(databaseConnectionManager.getConnectionTransaction().getTransactionType())); } } @@ -190,7 +190,7 @@ private void processDistributedTransaction() throws SQLException { private void beginDistributedTransaction() throws SQLException { databaseConnectionManager.close(); databaseConnectionManager.getConnectionTransaction().begin(); - getConnectionContext().getTransactionContext().setInTransaction(true); + getConnectionContext().getTransactionContext().beginTransaction(String.valueOf(databaseConnectionManager.getConnectionTransaction().getTransactionType())); } /** @@ -203,7 +203,7 @@ public void handleAutoCommit() throws SQLException { if (TransactionType.isDistributedTransaction(databaseConnectionManager.getConnectionTransaction().getTransactionType())) { beginDistributedTransaction(); } else { - getConnectionContext().getTransactionContext().setInTransaction(true); + getConnectionContext().getTransactionContext().beginTransaction(String.valueOf(databaseConnectionManager.getConnectionTransaction().getTransactionType())); } } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java index e90833b01e61c..f45ec7c73a183 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java @@ -261,7 +261,7 @@ private void prepareCursorStatementContext(final CursorAvailable statementContex prepareCursorStatementContext(statementContext, connectionSession, cursorName); } if (statementContext instanceof CloseStatementContext && ((CloseStatementContext) statementContext).getSqlStatement().isCloseAll()) { - connectionSession.getConnectionContext().clearCursorConnectionContext(); + connectionSession.getConnectionContext().clearCursorContext(); } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java index 2219e93bba47d..6547ce1414120 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java @@ -65,7 +65,8 @@ public BackendTransactionManager(final ProxyDatabaseConnectionManager databaseCo public void begin() { if (!connection.getConnectionSession().getTransactionStatus().isInTransaction()) { connection.getConnectionSession().getTransactionStatus().setInTransaction(true); - getTransactionContext().setInTransaction(true); + getTransactionContext().beginTransaction( + String.valueOf(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType())); connection.closeHandlers(true); connection.closeConnections(false); } @@ -100,8 +101,8 @@ public void commit() throws SQLException { } connection.getConnectionSession().getTransactionStatus().setInTransaction(false); connection.getConnectionSession().getTransactionStatus().setExceptionOccur(false); - connection.getConnectionSession().getConnectionContext().clearTransactionConnectionContext(); - connection.getConnectionSession().getConnectionContext().clearCursorConnectionContext(); + connection.getConnectionSession().getConnectionContext().clearTransactionContext(); + connection.getConnectionSession().getConnectionContext().clearCursorContext(); } } } @@ -124,8 +125,8 @@ public void rollback() throws SQLException { } connection.getConnectionSession().getTransactionStatus().setInTransaction(false); connection.getConnectionSession().getTransactionStatus().setExceptionOccur(false); - connection.getConnectionSession().getConnectionContext().clearTransactionConnectionContext(); - connection.getConnectionSession().getConnectionContext().clearCursorConnectionContext(); + connection.getConnectionSession().getConnectionContext().clearTransactionContext(); + connection.getConnectionSession().getConnectionContext().clearCursorContext(); } } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java index c13397453f7d9..3169f85024629 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java @@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.proxy.backend.connector.DatabaseConnector; import org.apache.shardingsphere.proxy.backend.connector.DatabaseConnectorFactory; +import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler; import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow; import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader; @@ -33,6 +34,7 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.xa.XARecoveryStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.xa.XARollbackStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.xa.XAStatement; +import org.apache.shardingsphere.transaction.rule.TransactionRule; import org.apache.shardingsphere.transaction.xa.jta.exception.XATransactionNestedBeginException; import java.sql.SQLException; @@ -85,7 +87,8 @@ public ResponseHeader execute() throws SQLException { private ResponseHeader begin() throws SQLException { ShardingSpherePreconditions.checkState(!connectionSession.getTransactionStatus().isInTransaction(), XATransactionNestedBeginException::new); ResponseHeader result = backendHandler.execute(); - connectionSession.getConnectionContext().getTransactionContext().setInTransaction(true); + connectionSession.getConnectionContext().getTransactionContext().beginTransaction( + String.valueOf(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType())); return result; } @@ -93,8 +96,8 @@ private ResponseHeader finish() throws SQLException { try { return backendHandler.execute(); } finally { - connectionSession.getConnectionContext().clearTransactionConnectionContext(); - connectionSession.getConnectionContext().clearCursorConnectionContext(); + connectionSession.getConnectionContext().clearTransactionContext(); + connectionSession.getConnectionContext().clearCursorContext(); } } } From bd56f3cfdb6f5c8198ae54ef5a97a33aa716bff5 Mon Sep 17 00:00:00 2001 From: ZhangCheng Date: Tue, 12 Mar 2024 11:10:54 +0800 Subject: [PATCH 2/4] Fix force startup e2e --- .../transaction/TransactionConnectionContext.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java index 56cd4979430f0..a3c419a48ccb5 100644 --- a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java +++ b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java @@ -35,12 +35,22 @@ public final class TransactionConnectionContext implements AutoCloseable { @Setter private volatile String readWriteSplitReplicaRoute; - + + /** + * Begin transaction. + * + * @param transactionType transaction type + */ public void beginTransaction(final String transactionType) { this.transactionType = transactionType; inTransaction = true; } + /** + * Judge is in XA transaction or not. + * + * @return in XA transaction or not + */ public boolean isInXATransaction() { return inTransaction && "XA".equals(transactionType); } From b3fc2330461caf77e570160e2f979b67cf21f0dd Mon Sep 17 00:00:00 2001 From: ZhangCheng Date: Tue, 12 Mar 2024 11:18:53 +0800 Subject: [PATCH 3/4] Fix force startup e2e --- .../connection/transaction/TransactionConnectionContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java index a3c419a48ccb5..c7f45b40bce81 100644 --- a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java +++ b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java @@ -35,7 +35,7 @@ public final class TransactionConnectionContext implements AutoCloseable { @Setter private volatile String readWriteSplitReplicaRoute; - + /** * Begin transaction. * From 5de22dcb2ecbb49b853f1215b9d7fb69f98ea425 Mon Sep 17 00:00:00 2001 From: ZhangCheng Date: Tue, 12 Mar 2024 12:00:38 +0800 Subject: [PATCH 4/4] Fix force startup e2e --- .../jdbc/transaction/BackendTransactionManager.java | 3 +-- .../backend/handler/transaction/TransactionXAHandler.java | 5 +---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java index 6547ce1414120..9718a3a265580 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java @@ -65,8 +65,7 @@ public BackendTransactionManager(final ProxyDatabaseConnectionManager databaseCo public void begin() { if (!connection.getConnectionSession().getTransactionStatus().isInTransaction()) { connection.getConnectionSession().getTransactionStatus().setInTransaction(true); - getTransactionContext().beginTransaction( - String.valueOf(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType())); + getTransactionContext().beginTransaction(String.valueOf(transactionType)); connection.closeHandlers(true); connection.closeConnections(false); } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java index 3169f85024629..8e9fb34b5005e 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java @@ -24,7 +24,6 @@ import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.proxy.backend.connector.DatabaseConnector; import org.apache.shardingsphere.proxy.backend.connector.DatabaseConnectorFactory; -import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler; import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow; import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader; @@ -34,7 +33,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.xa.XARecoveryStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.xa.XARollbackStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.xa.XAStatement; -import org.apache.shardingsphere.transaction.rule.TransactionRule; import org.apache.shardingsphere.transaction.xa.jta.exception.XATransactionNestedBeginException; import java.sql.SQLException; @@ -87,8 +85,7 @@ public ResponseHeader execute() throws SQLException { private ResponseHeader begin() throws SQLException { ShardingSpherePreconditions.checkState(!connectionSession.getTransactionStatus().isInTransaction(), XATransactionNestedBeginException::new); ResponseHeader result = backendHandler.execute(); - connectionSession.getConnectionContext().getTransactionContext().beginTransaction( - String.valueOf(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType())); + connectionSession.getConnectionContext().getTransactionContext().beginTransaction(String.valueOf(connectionSession.getTransactionStatus().getTransactionType())); return result; }