Skip to content

Commit

Permalink
opt
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes committed Nov 27, 2023
1 parent 2a4998b commit 45ed5ee
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 37 deletions.
34 changes: 23 additions & 11 deletions server/src/main/java/io/seata/server/session/BranchSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,25 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.seata.common.util.CompressUtil;

import io.seata.common.util.BufferUtils;
import io.seata.common.util.CompressUtil;
import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchStatus;
import io.seata.core.model.BranchType;
import io.seata.core.model.LockStatus;
import io.seata.server.lock.LockManager;
import io.seata.server.lock.LockerManagerFactory;
import io.seata.server.storage.file.lock.FileLocker;
import io.seata.server.store.SessionStorable;
import io.seata.server.store.StoreConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import static io.seata.core.model.LockStatus.Locked;

/**
Expand Down Expand Up @@ -72,8 +74,18 @@ public class BranchSession implements Lockable, Comparable<BranchSession>, Sessi

private LockStatus lockStatus = Locked;

private ConcurrentMap<FileLocker.BucketLockMap, Set<String>> lockHolder
= new ConcurrentHashMap<>();
private final Map<FileLocker.BucketLockMap, Set<String>> lockHolder;

private final LockManager lockManager = LockerManagerFactory.getLockManager();

public BranchSession() {
lockHolder = new ConcurrentHashMap<>(2);
}

public BranchSession(BranchType branchType) {
this.branchType = branchType;
this.lockHolder = branchType == BranchType.AT ? new ConcurrentHashMap<>(8) : Collections.emptyMap();
}

/**
* Gets application data.
Expand Down Expand Up @@ -274,7 +286,7 @@ public boolean canBeCommittedAsync() {
*
* @return the lock holder
*/
public ConcurrentMap<FileLocker.BucketLockMap, Set<String>> getLockHolder() {
public Map<FileLocker.BucketLockMap, Set<String>> getLockHolder() {
return lockHolder;
}

Expand All @@ -284,16 +296,16 @@ public boolean lock() throws TransactionException {
}

public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {
if (this.getBranchType().equals(BranchType.AT)) {
return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);
if (this.branchType.equals(BranchType.AT)) {
return lockManager.acquireLock(this, autoCommit, skipCheckLock);
}
return true;
}

@Override
public boolean unlock() throws TransactionException {
if (this.getBranchType() == BranchType.AT) {
return LockerManagerFactory.getLockManager().releaseLock(this);
if (this.branchType == BranchType.AT) {
return lockManager.releaseLock(this);
}
return true;
}
Expand Down Expand Up @@ -351,7 +363,7 @@ public byte[] encode() {

ByteBuffer byteBuffer = byteBufferThreadLocal.get();
//recycle
BufferUtils.clear(byteBuffer);
byteBuffer.clear();

byteBuffer.putLong(transactionId);
byteBuffer.putLong(branchId);
Expand Down
90 changes: 64 additions & 26 deletions server/src/main/java/io/seata/server/session/GlobalSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.seata.common.ConfigurationKeys;
import io.seata.common.Constants;
import io.seata.common.DefaultValues;
import io.seata.common.XID;
import io.seata.common.util.BufferUtils;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.exception.GlobalTransactionException;
import io.seata.core.exception.TransactionException;
import io.seata.core.exception.TransactionExceptionCode;
Expand All @@ -42,6 +42,7 @@
import io.seata.core.model.LockStatus;
import io.seata.server.UUIDGenerator;
import io.seata.server.lock.LockerManagerFactory;
import io.seata.server.cluster.raft.RaftServerFactory;
import io.seata.server.store.SessionStorable;
import io.seata.server.store.StoreConfig;
import org.slf4j.Logger;
Expand All @@ -66,6 +67,12 @@ public class GlobalSession implements SessionLifecycle, SessionStorable {
private static ThreadLocal<ByteBuffer> byteBufferThreadLocal = ThreadLocal.withInitial(() -> ByteBuffer.allocate(
MAX_GLOBAL_SESSION_SIZE));

/**
* ThreadLocal should be optimize.
* It is tied to the current threading model. threadlocal's public set method does nothing to protect it from abuse.
*/
private static final ThreadLocal<GlobalStatus> EXPECTED_STATUS_THREAD_LOCAL = new ThreadLocal<>();

/**
* If the global session's status is (Rollbacking or Committing) and currentTime - createTime >= RETRY_DEAD_THRESHOLD
* then the tx will be remand as need to retry rollback
Expand Down Expand Up @@ -99,6 +106,7 @@ public class GlobalSession implements SessionLifecycle, SessionStorable {

private GlobalSessionLock globalSessionLock = new GlobalSessionLock();

private Set<SessionLifecycleListener> lifecycleListeners = new HashSet<>(2);

/**
* Add boolean.
Expand All @@ -122,10 +130,20 @@ public boolean add(BranchSession branchSession) {
* @return the boolean
*/
public boolean remove(BranchSession branchSession) {
return branchSessions.remove(branchSession);
synchronized (this) {
return branchSessions.remove(branchSession);
}
}

private Set<SessionLifecycleListener> lifecycleListeners = new HashSet<>();
/**
* Remove boolean.
*
* @param branchId the long
* @return the boolean
*/
public boolean remove(Long branchId) {
return this.remove(this.getBranch(branchId));
}

/**
* Can be committed async boolean.
Expand Down Expand Up @@ -194,26 +212,28 @@ public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
this.beginTime = System.currentTimeMillis();
this.active = true;
SessionHolder.getRootSessionManager().onBegin(this);
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBegin(this);
}
}

@Override
public void changeGlobalStatus(GlobalStatus status) throws TransactionException {
if (GlobalStatus.Rollbacking == status) {
if (GlobalStatus.Rollbacking == status || GlobalStatus.TimeoutRollbacking == status) {
LockerManagerFactory.getLockManager().updateLockStatus(xid, LockStatus.Rollbacking);
}
SessionHolder.getRootSessionManager().onStatusChange(this, status);
// set session status after update successfully
this.status = status;
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onStatusChange(this, status);
}
}

@Override
public void changeBranchStatus(BranchSession branchSession, BranchStatus status)
throws TransactionException {
branchSession.setStatus(status);
public void changeBranchStatus(BranchSession branchSession, BranchStatus status) throws TransactionException {
SessionHolder.getRootSessionManager().onBranchStatusChange(this, branchSession, status);
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBranchStatusChange(this, branchSession, status);
}
Expand All @@ -227,6 +247,7 @@ public boolean isActive() {
@Override
public void close() throws TransactionException {
if (active) {
SessionHolder.getRootSessionManager().onClose(this);
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onClose(this);
}
Expand All @@ -236,12 +257,15 @@ public void close() throws TransactionException {
@Override
public void end() throws TransactionException {
if (GlobalStatus.isTwoPhaseSuccess(status)) {
// TODO: Non AT mode does not need to be unlocked
// Clean locks first
clean();
SessionHolder.getRootSessionManager().onSuccessEnd(this);
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onSuccessEnd(this);
}
} else {
SessionHolder.getRootSessionManager().onFailEnd(this);
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onFailEnd(this);
}
Expand Down Expand Up @@ -286,11 +310,13 @@ public void removeSessionLifecycleListener(SessionLifecycleListener sessionLifec

@Override
public void addBranch(BranchSession branchSession) throws TransactionException {
SessionHolder.getRootSessionManager().onAddBranch(this, branchSession);
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onAddBranch(this, branchSession);
}
branchSession.setStatus(BranchStatus.Registered);
add(branchSession);
if (!RaftServerFactory.getInstance().isRaftMode()) {
add(branchSession);
}
}

public void loadBranchs() {
Expand All @@ -309,19 +335,25 @@ public void loadBranchs() {
public void unlockBranch(BranchSession branchSession) throws TransactionException {
// do not unlock if global status in (Committing, CommitRetrying, AsyncCommitting),
// because it's already unlocked in 'DefaultCore.commit()'
if (status != Committing && status != CommitRetrying && status != AsyncCommitting) {
if (this.status != Committing && this.status != CommitRetrying && this.status != AsyncCommitting) {
if (!branchSession.unlock()) {
throw new TransactionException("Unlock branch lock failed, xid = " + this.xid + ", branchId = " + branchSession.getBranchId());
throw new TransactionException(
"Unlock branch lock failed, xid = " + this.xid + ", branchId = " + branchSession.getBranchId());
}
}
}

@Override
public void removeBranch(BranchSession branchSession) throws TransactionException {
SessionHolder.getRootSessionManager().onRemoveBranch(this, branchSession);
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onRemoveBranch(this, branchSession);
}
remove(branchSession);

if (!RaftServerFactory.getInstance().isRaftMode()) {
this.remove(branchSession);
}

}

@Override
Expand Down Expand Up @@ -586,7 +618,7 @@ public byte[] encode() {
}
ByteBuffer byteBuffer = byteBufferThreadLocal.get();
//recycle
BufferUtils.clear(byteBuffer);
byteBuffer.clear();

byteBuffer.putLong(transactionId);
byteBuffer.putInt(timeout);
Expand Down Expand Up @@ -620,7 +652,6 @@ public byte[] encode() {
} else {
byteBuffer.putInt(0);
}

byteBuffer.putLong(beginTime);
byteBuffer.put((byte)status.getCode());
BufferUtils.flip(byteBuffer);
Expand All @@ -631,7 +662,7 @@ public byte[] encode() {

private int calGlobalSessionSize(byte[] byApplicationIdBytes, byte[] byServiceGroupBytes, byte[] byTxNameBytes,
byte[] xidBytes, byte[] applicationDataBytes) {
final int size = 8 // transactionId
return 8 // transactionId
+ 4 // timeout
+ 2 // byApplicationIdBytes.length
+ 2 // byServiceGroupBytes.length
Expand All @@ -645,7 +676,6 @@ private int calGlobalSessionSize(byte[] byApplicationIdBytes, byte[] byServiceGr
+ (byTxNameBytes == null ? 0 : byTxNameBytes.length)
+ (xidBytes == null ? 0 : xidBytes.length)
+ (applicationDataBytes == null ? 0 : applicationDataBytes.length);
return size;
}

@Override
Expand Down Expand Up @@ -746,26 +776,34 @@ public List<BranchSession> getBranchSessions() {
}

public void asyncCommit() throws TransactionException {
this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());
this.setStatus(GlobalStatus.AsyncCommitting);
SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);
changeGlobalStatus(GlobalStatus.AsyncCommitting);
}

public void queueToRetryCommit() throws TransactionException {
this.addSessionLifecycleListener(SessionHolder.getRetryCommittingSessionManager());
this.setStatus(GlobalStatus.CommitRetrying);
SessionHolder.getRetryCommittingSessionManager().addGlobalSession(this);
changeGlobalStatus(GlobalStatus.CommitRetrying);
}

public void queueToRetryRollback() throws TransactionException {
this.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager());
GlobalStatus currentStatus = this.getStatus();
GlobalStatus newStatus;
if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
this.setStatus(GlobalStatus.TimeoutRollbackRetrying);
newStatus = GlobalStatus.TimeoutRollbackRetrying;
} else {
this.setStatus(GlobalStatus.RollbackRetrying);
newStatus = GlobalStatus.RollbackRetrying;
}
SessionHolder.getRetryRollbackingSessionManager().addGlobalSession(this);
changeGlobalStatus(newStatus);
}

public void setExpectedStatusFromCurrent() {
EXPECTED_STATUS_THREAD_LOCAL.set(this.status);
}

public void cleanExpectedStatus() {
EXPECTED_STATUS_THREAD_LOCAL.remove();
}

public GlobalStatus getExpectedStatus() {
return EXPECTED_STATUS_THREAD_LOCAL.get();
}

@Override
Expand Down

0 comments on commit 45ed5ee

Please sign in to comment.