Skip to content

Commit

Permalink
Merge branch 'develop' into fix_3946
Browse files Browse the repository at this point in the history
  • Loading branch information
chiangcho authored Jan 6, 2022
2 parents 3302067 + 7b11cb5 commit 7a60018
Show file tree
Hide file tree
Showing 24 changed files with 204 additions and 140 deletions.
7 changes: 5 additions & 2 deletions changes/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#4191](https://github.com/seata/seata/pull/4191)] RPC请求超时时间支持配置化
- [[#4212](https://github.com/seata/seata/pull/4212)] 控制台接口合并优化
- [[#4216](https://github.com/seata/seata/pull/4216)] 非AT用户无须清理undolog表
- [[#4237](https://github.com/seata/seata/pull/4237)] 当所有的before image均为空的时候,跳过checkLock的步骤
- [[#4251](https://github.com/seata/seata/pull/4251)] 优化部分代码处理
- [[#4262](https://github.com/seata/seata/pull/4262)] 优化 tcc 模块代码处理


### test:
Expand Down Expand Up @@ -178,8 +180,8 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [iqinning](https://github.com/iqinning)
- [zhaoyuguang](https://github.com/zhaoyuguang)
- [yujianfei1986](https://github.com/yujianfei1986)
- [yujianfei1986](https://github.com/yujianfei1986))
- [jsbxyyx](https://github.com/jsbxyyx))
- [yujianfei1986](https://github.com/yujianfei1986)
- [jsbxyyx](https://github.com/jsbxyyx)
- [lvekee](https://github.com/lvekee)
- [elrond-g](https://github.com/elrond-g)
- [Rubbernecker](https://github.com/Rubbernecker)
Expand All @@ -192,6 +194,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [xujj](https://github.com/XBNGit)
- [portman](https://github.com/iportman)
- [lcmvs](https://github.com/lcmvs)
- [pengten](https://github.com/pengten)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。

Expand Down
3 changes: 3 additions & 0 deletions changes/en-us/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@
- [[#4191](https://github.com/seata/seata/pull/4191)] support rpc timeout can be customized.
- [[#4212](https://github.com/seata/seata/pull/4212)] optimize the interface of the console
- [[#4216](https://github.com/seata/seata/pull/4216)] no more attempt to clean undolog for none AT user
- [[#4237](https://github.com/seata/seata/pull/4237)] skip check lock when all the before image is empty
- [[#4251](https://github.com/seata/seata/pull/4251)] optimize partial code handling
- [[#4262](https://github.com/seata/seata/pull/4262)] optimize tcc module code handling


### test:
Expand Down Expand Up @@ -191,6 +193,7 @@
- [xujj](https://github.com/XBNGit)
- [portman](https://github.com/iportman)
- [lcmvs](https://github.com/lcmvs)
- [pengten](https://github.com/pengten)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.

Expand Down
5 changes: 5 additions & 0 deletions common/src/main/java/io/seata/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,9 @@ public interface Constants {
*/
String AUTO_COMMIT = "autoCommit";

/**
* The constant SKIP_CHECK_LOCK
*/
String SKIP_CHECK_LOCK = "skipCheckLock";

}
2 changes: 1 addition & 1 deletion core/src/main/java/io/seata/core/lock/LocalDBLocker.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public boolean acquireLock(List<RowLock> rowLock) {
}

@Override
public boolean acquireLock(List<RowLock> rowLock, boolean autoCommit) {
public boolean acquireLock(List<RowLock> rowLock, boolean autoCommit, boolean skipCheckLock) {
return false;
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/seata/core/lock/Locker.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ public interface Locker {
*
* @param rowLock the row lock
* @param autoCommit the auto commit
* @param skipCheckLock whether skip check lock or not
* @return the boolean
*/
boolean acquireLock(List<RowLock> rowLock, boolean autoCommit);
boolean acquireLock(List<RowLock> rowLock, boolean autoCommit, boolean skipCheckLock);

/**
* Release lock boolean.
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/seata/core/store/LockStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ public interface LockStore {
*
* @param lockDOs the lock d os
* @param autoCommit the auto commit
* @param skipCheckLock whether skip check lock or not
* @return the boolean
*/
boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit);
boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock);

/**
* Un lock boolean.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@


import static io.seata.common.Constants.AUTO_COMMIT;
import static io.seata.common.Constants.SKIP_CHECK_LOCK;

/**
* The type Connection context.
Expand All @@ -62,7 +63,7 @@ public String getSavepointName() throws SQLException {
private boolean isGlobalLockRequire;
private Savepoint currentSavepoint = DEFAULT_SAVEPOINT;
private boolean autoCommitChanged;
private Map<String, Object> applicationData = new HashMap<>(2);
private final Map<String, Object> applicationData = new HashMap<>(2, 1.0001f);

/**
* the lock keys buffer
Expand Down Expand Up @@ -277,12 +278,20 @@ public String getApplicationData() throws TransactionException {
// when transaction are enabled, it must be false
if (!autoCommit) {
this.applicationData.put(AUTO_COMMIT, autoCommit);
}

if (allBeforeImageEmpty()) {
this.applicationData.put(SKIP_CHECK_LOCK, true);
}

if (!this.applicationData.isEmpty()) {
try {
return MAPPER.writeValueAsString(this.applicationData);
} catch (JsonProcessingException e) {
throw new TransactionException(e.getMessage(), e);
}
}

return null;
}

Expand Down Expand Up @@ -365,6 +374,23 @@ private List<Savepoint> getAfterSavepoints(Savepoint savepoint) {
return new ArrayList<>(savepoints.subList(savepoints.indexOf(savepoint), savepoints.size()));
}

/**
* Check whether all the before image is empty.
*
* @return if all is empty, return true
*/
private boolean allBeforeImageEmpty() {
for (List<SQLUndoLog> sqlUndoLogs : sqlUndoItemsBuffer.values()) {
for (SQLUndoLog undoLog : sqlUndoLogs) {
if (null == undoLog.getBeforeImage() || undoLog.getBeforeImage().size() != 0) {
return false;
}
}
}

return true;
}

@Override
public String toString() {
return StringUtils.toString(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}

Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
context.setBranchId(branchId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ public Object intercept(Object proxy, Method method, Object[] args, MethodProxy
String rawDataId = (String) args[0];
if (args.length == 1) {
result = get(convertDataId(rawDataId));
} else if (args.length == 2) {
result = get(convertDataId(rawDataId), args[1]);
} else if (args.length == 3) {
} else {
result = get(convertDataId(rawDataId), args[1]);
}
if (result != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ public abstract class AbstractLockManager implements LockManager {

@Override
public boolean acquireLock(BranchSession branchSession) throws TransactionException {
return acquireLock(branchSession, true);
return acquireLock(branchSession, true, false);
}

@Override
public boolean acquireLock(BranchSession branchSession, boolean autoCommit) throws TransactionException {
public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {
if (branchSession == null) {
throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
}
Expand All @@ -62,7 +62,7 @@ public boolean acquireLock(BranchSession branchSession, boolean autoCommit) thro
// no lock
return true;
}
return getLocker(branchSession).acquireLock(locks, autoCommit);
return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/io/seata/server/lock/LockManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ public interface LockManager {
*
* @param branchSession the branch session
* @param autoCommit the auto commit
* @param skipCheckLock whether skip check lock or not
* @return the boolean
* @throws TransactionException the transaction exception
*/
boolean acquireLock(BranchSession branchSession, boolean autoCommit) throws TransactionException;
boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException;

/**
* Un lock boolean.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ public ConcurrentMap<FileLocker.BucketLockMap, Set<String>> getLockHolder() {

@Override
public boolean lock() throws TransactionException {
return this.lock(true);
return this.lock(true, false);
}

public boolean lock(boolean autoCommit) throws TransactionException {
public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {
if (this.getBranchType().equals(BranchType.AT)) {
return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit);
return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class DataBaseDistributedLocker implements DistributedLocker {
private DataSource distributedLockDataSource;

/**
* weather the distribute lock demotion
* whether the distribute lock demotion
* using for 1.5.0 only and will remove in 1.6.0
*/
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ public DataBaseLocker(DataSource logStoreDataSource) {

@Override
public boolean acquireLock(List<RowLock> locks) {
return acquireLock(locks, true);
return acquireLock(locks, true, false);
}

@Override
public boolean acquireLock(List<RowLock> locks, boolean autoCommit) {
public boolean acquireLock(List<RowLock> locks, boolean autoCommit, boolean skipCheckLock) {
if (CollectionUtils.isEmpty(locks)) {
// no lock
return true;
}
try {
return lockStore.acquireLock(convertToLockDO(locks), autoCommit);
return lockStore.acquireLock(convertToLockDO(locks), autoCommit, skipCheckLock);
} catch (StoreException e) {
throw e;
} catch (Exception t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ public boolean acquireLock(LockDO lockDO) {

@Override
public boolean acquireLock(List<LockDO> lockDOs) {
return acquireLock(lockDOs, true);
return acquireLock(lockDOs, true, false);
}

@Override
public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit) {
public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
Expand All @@ -118,57 +118,61 @@ public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit) {
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
List<LockDO> unrepeatedLockDOs = lockDOs;

//check lock
boolean canLock = true;
//query
String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
ps = conn.prepareStatement(checkLockSQL);
for (int i = 0; i < lockDOs.size(); i++) {
ps.setString(i + 1, lockDOs.get(i).getRowKey());
}
rs = ps.executeQuery();
String currentXID = lockDOs.get(0).getXid();
boolean failFast = false;
while (rs.next()) {
String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
if (!StringUtils.equals(dbXID, currentXID)) {
if (LOGGER.isInfoEnabled()) {
String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
}
if (!autoCommit) {
int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
if (status == LockStatus.Rollbacking.getCode()) {
failFast = true;
if (!skipCheckLock) {

boolean canLock = true;
//query
String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
ps = conn.prepareStatement(checkLockSQL);
for (int i = 0; i < lockDOs.size(); i++) {
ps.setString(i + 1, lockDOs.get(i).getRowKey());
}
rs = ps.executeQuery();
String currentXID = lockDOs.get(0).getXid();
boolean failFast = false;
while (rs.next()) {
String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
if (!StringUtils.equals(dbXID, currentXID)) {
if (LOGGER.isInfoEnabled()) {
String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
}
if (!autoCommit) {
int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
if (status == LockStatus.Rollbacking.getCode()) {
failFast = true;
}
}
canLock = false;
break;
}
canLock = false;
break;
}

dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
}
if (!canLock) {
conn.rollback();
if (failFast) {
throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
}
if (!canLock) {
conn.rollback();
if (failFast) {
throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
}
return false;
}
// If the lock has been exists in db, remove it from the lockDOs
if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
.collect(Collectors.toList());
}
if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
conn.rollback();
return true;
}
return false;
}
List<LockDO> unrepeatedLockDOs = null;
if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
.collect(Collectors.toList());
} else {
unrepeatedLockDOs = lockDOs;
}
if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
conn.rollback();
return true;
}
//lock

// lock
if (unrepeatedLockDOs.size() == 1) {
LockDO lockDO = unrepeatedLockDOs.get(0);
if (!doAcquireLock(conn, lockDO)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ public FileLocker(BranchSession branchSession) {

@Override
public boolean acquireLock(List<RowLock> rowLocks) {
return acquireLock(rowLocks, true);
return acquireLock(rowLocks, true, false);
}

@Override
public boolean acquireLock(List<RowLock> rowLocks, boolean autoCommit) {
public boolean acquireLock(List<RowLock> rowLocks, boolean autoCommit, boolean skipCheckLock) {
if (CollectionUtils.isEmpty(rowLocks)) {
// no lock
return true;
Expand Down
Loading

0 comments on commit 7a60018

Please sign in to comment.