Skip to content

Commit

Permalink
Merge branch '2.x' into 1127
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Nov 27, 2023
2 parents 45ed5ee + e32cb1c commit 22e6e37
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 36 deletions.
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6075](https://github.com/seata/seata/pull/6075)] fix missing table alias for on update column of image SQL

### optimize:
- [[#6061](https://github.com/seata/seata/pull/6061)] merge the rpcMergeMessageSend threads of rm and tm and increase the thread hibernation duration
- [[#6031](https://github.com/seata/seata/pull/6031)] add a check for the existence of the undolog table

### security:
Expand All @@ -21,6 +22,7 @@ Thanks to these contributors for their code commits. Please report an unintended

<!-- Please make sure your Github ID is in the list below -->
- [slievrly](https://github.com/slievrly)
- [yiqi](https://github.com/PleaseGiveMeTheCoke)
- [ptyin](https://github.com/ptyin)
- [laywin](https://github.com/laywin)
- [imcmai](https://github.com/imcmai)
Expand Down
3 changes: 3 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
- [[#6075](https://github.com/seata/seata/pull/6075)] 修复镜像SQL对于on update列没有添加表别名的问题

### optimize:
- [[#6061](https://github.com/seata/seata/pull/6061)] 合并rm和tm的rpcMergeMessageSend线程,增加线程休眠时长
- [[#6031](https://github.com/seata/seata/pull/6031)] 添加undo_log表的存在性校验


### security:
- [[#6069](https://github.com/seata/seata/pull/6069)] 升级Guava依赖版本,修复安全漏洞

Expand All @@ -21,6 +23,7 @@

<!-- 请确保您的 GitHub ID 在以下列表中 -->
- [slievrly](https://github.com/slievrly)
- [yiqi](https://github.com/PleaseGiveMeTheCoke)
- [ptyin](https://github.com/ptyin)
- [laywin](https://github.com/laywin)
- [imcmai](https://github.com/imcmai)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public abstract class AbstractNettyRemoting implements Disposable {
/**
* The Is sending.
*/
protected volatile boolean isSending = false;
protected static volatile boolean isSending = false;
private String group = "DEFAULT";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -77,14 +79,13 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
private static final String MSG_ID_PREFIX = "msgId:";
private static final String FUTURES_PREFIX = "futures:";
private static final String SINGLE_LOG_POSTFIX = ";";
private static final int MAX_MERGE_SEND_MILLS = 1;
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
private static final int MAX_MERGE_SEND_MILLS = 10;
private static final int MAX_MERGE_SEND_THREAD = 1;
private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L;
private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;
private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
protected final Object mergeLock = new Object();
private static final String MERGE_THREAD_NAME = "rpcMergeMessageSend";
protected static final Object MERGE_LOCK = new Object();

/**
* When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap.
Expand All @@ -96,29 +97,39 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
* Send via asynchronous thread {@link io.seata.core.rpc.netty.AbstractNettyRemotingClient.MergedSendRunnable}
* {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()}
*/
protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();
protected static final ConcurrentHashMap<String/*serverAddress*/, Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>>> BASKET_MAP = new ConcurrentHashMap<>();
private final NettyClientBootstrap clientBootstrap;
private final NettyClientChannelManager clientChannelManager;
private final NettyPoolKey.TransactionRole transactionRole;
private ExecutorService mergeSendExecutorService;
private static volatile ExecutorService mergeSendExecutorService;
private TransactionMessageHandler transactionMessageHandler;
protected volatile boolean enableClientBatchSendRequest;

@Override
public void init() {
timerExecutor.scheduleAtFixedRate(() -> clientChannelManager.reconnect(getTransactionServiceGroup()), SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
startMergeSendThread();
}
super.init();
clientBootstrap.start();
}

private void startMergeSendThread() {
if (mergeSendExecutorService == null) {
synchronized (AbstractNettyRemoting.class) {
if (mergeSendExecutorService == null) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(MERGE_THREAD_NAME, MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
}
}
}

public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
super(messageExecutor);
Expand Down Expand Up @@ -146,8 +157,14 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {
futures.put(rpcMessage.getId(), messageFuture);

// put message into basketMap
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>> roleMessage = CollectionUtils.computeIfAbsent(BASKET_MAP, serverAddress,
key -> {
Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>> map = new HashMap<>(2);
map.put(NettyPoolKey.TransactionRole.TMROLE, new LinkedBlockingQueue<>());
map.put(NettyPoolKey.TransactionRole.RMROLE, new LinkedBlockingQueue<>());
return map;
});
BlockingQueue<RpcMessage> basket = roleMessage.get(transactionRole);
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
Expand All @@ -157,8 +174,8 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
synchronized (MERGE_LOCK) {
MERGE_LOCK.notifyAll();
}
}

Expand Down Expand Up @@ -291,10 +308,6 @@ protected String getXid(Object msg) {
return StringUtils.isBlank(xid) ? String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) : xid;
}

private String getThreadPrefix() {
return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
}

/**
* Get pool key function.
*
Expand Down Expand Up @@ -326,59 +339,72 @@ private String getThreadPrefix() {
/**
* The type Merged send runnable.
*/
private class MergedSendRunnable implements Runnable {
private static class MergedSendRunnable implements Runnable {

@Override
public void run() {
while (true) {
synchronized (mergeLock) {
try {
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) {
synchronized (MERGE_LOCK) {
if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) {
try {
MERGE_LOCK.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
}
}
isSending = true;
basketMap.forEach((address, basket) -> {
BASKET_MAP.forEach((address, roleMessage) -> roleMessage.forEach((role, basket) -> {
if (basket.isEmpty()) {
return;
}

AbstractNettyRemotingClient client;
if (role.equals(NettyPoolKey.TransactionRole.RMROLE)) {
client = RmNettyRemotingClient.getInstance();
} else {
client = TmNettyRemotingClient.getInstance();
}

ConcurrentHashMap<Integer, MessageFuture> clientFutures = client.getFutures();

MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
printMergeMessageLog(clientFutures, mergeMessage);
}
Channel sendChannel = null;
try {
// send batch message is sync request, but there is no need to get the return value.
// Since the messageFuture has been created before the message is placed in basketMap,
// the return value will be obtained in ClientOnResponseProcessor.
sendChannel = clientChannelManager.acquireChannel(address);
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
sendChannel = client.getClientChannelManager().acquireChannel(address);
client.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
destroyChannel(address, sendChannel);
client.destroyChannel(address, sendChannel);
}
// fast fail
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
MessageFuture messageFuture = clientFutures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(
new RuntimeException(String.format("%s is unreachable", address), e));
new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
});
}));
isSending = false;
}
}

private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
private void printMergeMessageLog(ConcurrentHashMap<Integer, MessageFuture> clientFutures, MergedWarpMessage mergeMessage) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("merge msg size:{}", mergeMessage.msgIds.size());
for (AbstractMessage cm : mergeMessage.msgs) {
Expand All @@ -389,7 +415,7 @@ private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
sb.append(MSG_ID_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
sb.append("\n");
for (long l : futures.keySet()) {
for (long l : clientFutures.keySet()) {
sb.append(FUTURES_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
LOGGER.debug(sb.toString());
Expand Down

0 comments on commit 22e6e37

Please sign in to comment.