diff --git a/server/src/main/java/io/seata/server/session/BranchSession.java b/server/src/main/java/io/seata/server/session/BranchSession.java index b6c1e139617..ead43a145c7 100644 --- a/server/src/main/java/io/seata/server/session/BranchSession.java +++ b/server/src/main/java/io/seata/server/session/BranchSession.java @@ -17,15 +17,18 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import io.seata.common.util.CompressUtil; + import io.seata.common.util.BufferUtils; +import io.seata.common.util.CompressUtil; import io.seata.core.exception.TransactionException; import io.seata.core.model.BranchStatus; import io.seata.core.model.BranchType; import io.seata.core.model.LockStatus; +import io.seata.server.lock.LockManager; import io.seata.server.lock.LockerManagerFactory; import io.seata.server.storage.file.lock.FileLocker; import io.seata.server.store.SessionStorable; @@ -33,7 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import static io.seata.core.model.LockStatus.Locked; /** @@ -72,8 +74,18 @@ public class BranchSession implements Lockable, Comparable, Sessi private LockStatus lockStatus = Locked; - private ConcurrentMap> lockHolder - = new ConcurrentHashMap<>(); + private final Map> lockHolder; + + private final LockManager lockManager = LockerManagerFactory.getLockManager(); + + public BranchSession() { + lockHolder = new ConcurrentHashMap<>(2); + } + + public BranchSession(BranchType branchType) { + this.branchType = branchType; + this.lockHolder = branchType == BranchType.AT ? new ConcurrentHashMap<>(8) : Collections.emptyMap(); + } /** * Gets application data. @@ -274,7 +286,7 @@ public boolean canBeCommittedAsync() { * * @return the lock holder */ - public ConcurrentMap> getLockHolder() { + public Map> getLockHolder() { return lockHolder; } @@ -284,16 +296,16 @@ public boolean lock() throws TransactionException { } public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException { - if (this.getBranchType().equals(BranchType.AT)) { - return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock); + if (this.branchType.equals(BranchType.AT)) { + return lockManager.acquireLock(this, autoCommit, skipCheckLock); } return true; } @Override public boolean unlock() throws TransactionException { - if (this.getBranchType() == BranchType.AT) { - return LockerManagerFactory.getLockManager().releaseLock(this); + if (this.branchType == BranchType.AT) { + return lockManager.releaseLock(this); } return true; } @@ -351,7 +363,7 @@ public byte[] encode() { ByteBuffer byteBuffer = byteBufferThreadLocal.get(); //recycle - BufferUtils.clear(byteBuffer); + byteBuffer.clear(); byteBuffer.putLong(transactionId); byteBuffer.putLong(branchId); diff --git a/server/src/main/java/io/seata/server/session/GlobalSession.java b/server/src/main/java/io/seata/server/session/GlobalSession.java index 29b71e80832..afe19b2260a 100644 --- a/server/src/main/java/io/seata/server/session/GlobalSession.java +++ b/server/src/main/java/io/seata/server/session/GlobalSession.java @@ -26,13 +26,13 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import io.seata.common.ConfigurationKeys; import io.seata.common.Constants; import io.seata.common.DefaultValues; import io.seata.common.XID; import io.seata.common.util.BufferUtils; import io.seata.common.util.StringUtils; import io.seata.config.ConfigurationFactory; -import io.seata.core.constants.ConfigurationKeys; import io.seata.core.exception.GlobalTransactionException; import io.seata.core.exception.TransactionException; import io.seata.core.exception.TransactionExceptionCode; @@ -42,6 +42,7 @@ import io.seata.core.model.LockStatus; import io.seata.server.UUIDGenerator; import io.seata.server.lock.LockerManagerFactory; +import io.seata.server.cluster.raft.RaftServerFactory; import io.seata.server.store.SessionStorable; import io.seata.server.store.StoreConfig; import org.slf4j.Logger; @@ -66,6 +67,12 @@ public class GlobalSession implements SessionLifecycle, SessionStorable { private static ThreadLocal byteBufferThreadLocal = ThreadLocal.withInitial(() -> ByteBuffer.allocate( MAX_GLOBAL_SESSION_SIZE)); + /** + * ThreadLocal should be optimize. + * It is tied to the current threading model. threadlocal's public set method does nothing to protect it from abuse. + */ + private static final ThreadLocal EXPECTED_STATUS_THREAD_LOCAL = new ThreadLocal<>(); + /** * If the global session's status is (Rollbacking or Committing) and currentTime - createTime >= RETRY_DEAD_THRESHOLD * then the tx will be remand as need to retry rollback @@ -99,6 +106,7 @@ public class GlobalSession implements SessionLifecycle, SessionStorable { private GlobalSessionLock globalSessionLock = new GlobalSessionLock(); + private Set lifecycleListeners = new HashSet<>(2); /** * Add boolean. @@ -122,10 +130,20 @@ public boolean add(BranchSession branchSession) { * @return the boolean */ public boolean remove(BranchSession branchSession) { - return branchSessions.remove(branchSession); + synchronized (this) { + return branchSessions.remove(branchSession); + } } - private Set lifecycleListeners = new HashSet<>(); + /** + * Remove boolean. + * + * @param branchId the long + * @return the boolean + */ + public boolean remove(Long branchId) { + return this.remove(this.getBranch(branchId)); + } /** * Can be committed async boolean. @@ -194,6 +212,7 @@ public void begin() throws TransactionException { this.status = GlobalStatus.Begin; this.beginTime = System.currentTimeMillis(); this.active = true; + SessionHolder.getRootSessionManager().onBegin(this); for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onBegin(this); } @@ -201,9 +220,11 @@ public void begin() throws TransactionException { @Override public void changeGlobalStatus(GlobalStatus status) throws TransactionException { - if (GlobalStatus.Rollbacking == status) { + if (GlobalStatus.Rollbacking == status || GlobalStatus.TimeoutRollbacking == status) { LockerManagerFactory.getLockManager().updateLockStatus(xid, LockStatus.Rollbacking); } + SessionHolder.getRootSessionManager().onStatusChange(this, status); + // set session status after update successfully this.status = status; for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onStatusChange(this, status); @@ -211,9 +232,8 @@ public void changeGlobalStatus(GlobalStatus status) throws TransactionException } @Override - public void changeBranchStatus(BranchSession branchSession, BranchStatus status) - throws TransactionException { - branchSession.setStatus(status); + public void changeBranchStatus(BranchSession branchSession, BranchStatus status) throws TransactionException { + SessionHolder.getRootSessionManager().onBranchStatusChange(this, branchSession, status); for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onBranchStatusChange(this, branchSession, status); } @@ -227,6 +247,7 @@ public boolean isActive() { @Override public void close() throws TransactionException { if (active) { + SessionHolder.getRootSessionManager().onClose(this); for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onClose(this); } @@ -236,12 +257,15 @@ public void close() throws TransactionException { @Override public void end() throws TransactionException { if (GlobalStatus.isTwoPhaseSuccess(status)) { + // TODO: Non AT mode does not need to be unlocked // Clean locks first clean(); + SessionHolder.getRootSessionManager().onSuccessEnd(this); for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onSuccessEnd(this); } } else { + SessionHolder.getRootSessionManager().onFailEnd(this); for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onFailEnd(this); } @@ -286,11 +310,13 @@ public void removeSessionLifecycleListener(SessionLifecycleListener sessionLifec @Override public void addBranch(BranchSession branchSession) throws TransactionException { + SessionHolder.getRootSessionManager().onAddBranch(this, branchSession); for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onAddBranch(this, branchSession); } - branchSession.setStatus(BranchStatus.Registered); - add(branchSession); + if (!RaftServerFactory.getInstance().isRaftMode()) { + add(branchSession); + } } public void loadBranchs() { @@ -309,19 +335,25 @@ public void loadBranchs() { public void unlockBranch(BranchSession branchSession) throws TransactionException { // do not unlock if global status in (Committing, CommitRetrying, AsyncCommitting), // because it's already unlocked in 'DefaultCore.commit()' - if (status != Committing && status != CommitRetrying && status != AsyncCommitting) { + if (this.status != Committing && this.status != CommitRetrying && this.status != AsyncCommitting) { if (!branchSession.unlock()) { - throw new TransactionException("Unlock branch lock failed, xid = " + this.xid + ", branchId = " + branchSession.getBranchId()); + throw new TransactionException( + "Unlock branch lock failed, xid = " + this.xid + ", branchId = " + branchSession.getBranchId()); } } } @Override public void removeBranch(BranchSession branchSession) throws TransactionException { + SessionHolder.getRootSessionManager().onRemoveBranch(this, branchSession); for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onRemoveBranch(this, branchSession); } - remove(branchSession); + + if (!RaftServerFactory.getInstance().isRaftMode()) { + this.remove(branchSession); + } + } @Override @@ -586,7 +618,7 @@ public byte[] encode() { } ByteBuffer byteBuffer = byteBufferThreadLocal.get(); //recycle - BufferUtils.clear(byteBuffer); + byteBuffer.clear(); byteBuffer.putLong(transactionId); byteBuffer.putInt(timeout); @@ -620,7 +652,6 @@ public byte[] encode() { } else { byteBuffer.putInt(0); } - byteBuffer.putLong(beginTime); byteBuffer.put((byte)status.getCode()); BufferUtils.flip(byteBuffer); @@ -631,7 +662,7 @@ public byte[] encode() { private int calGlobalSessionSize(byte[] byApplicationIdBytes, byte[] byServiceGroupBytes, byte[] byTxNameBytes, byte[] xidBytes, byte[] applicationDataBytes) { - final int size = 8 // transactionId + return 8 // transactionId + 4 // timeout + 2 // byApplicationIdBytes.length + 2 // byServiceGroupBytes.length @@ -645,7 +676,6 @@ private int calGlobalSessionSize(byte[] byApplicationIdBytes, byte[] byServiceGr + (byTxNameBytes == null ? 0 : byTxNameBytes.length) + (xidBytes == null ? 0 : xidBytes.length) + (applicationDataBytes == null ? 0 : applicationDataBytes.length); - return size; } @Override @@ -746,26 +776,34 @@ public List getBranchSessions() { } public void asyncCommit() throws TransactionException { - this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager()); - this.setStatus(GlobalStatus.AsyncCommitting); - SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this); + changeGlobalStatus(GlobalStatus.AsyncCommitting); } public void queueToRetryCommit() throws TransactionException { - this.addSessionLifecycleListener(SessionHolder.getRetryCommittingSessionManager()); - this.setStatus(GlobalStatus.CommitRetrying); - SessionHolder.getRetryCommittingSessionManager().addGlobalSession(this); + changeGlobalStatus(GlobalStatus.CommitRetrying); } public void queueToRetryRollback() throws TransactionException { - this.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager()); GlobalStatus currentStatus = this.getStatus(); + GlobalStatus newStatus; if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) { - this.setStatus(GlobalStatus.TimeoutRollbackRetrying); + newStatus = GlobalStatus.TimeoutRollbackRetrying; } else { - this.setStatus(GlobalStatus.RollbackRetrying); + newStatus = GlobalStatus.RollbackRetrying; } - SessionHolder.getRetryRollbackingSessionManager().addGlobalSession(this); + changeGlobalStatus(newStatus); + } + + public void setExpectedStatusFromCurrent() { + EXPECTED_STATUS_THREAD_LOCAL.set(this.status); + } + + public void cleanExpectedStatus() { + EXPECTED_STATUS_THREAD_LOCAL.remove(); + } + + public GlobalStatus getExpectedStatus() { + return EXPECTED_STATUS_THREAD_LOCAL.get(); } @Override