From d3ad06a59073503e520f0092fbeb9b4607b2877c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=AA=E5=BF=A0=E7=A5=A5?= Date: Tue, 5 Nov 2024 21:43:13 +0800 Subject: [PATCH 01/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/loader/EnhancedServiceLoader.java | 17 ++++-- .../seata/common/lock/ResourceLock.java | 53 +++++++++++++++++++ .../seata/common/util/UUIDGenerator.java | 5 +- .../core/rpc/netty/AbstractNettyRemoting.java | 11 ++-- .../netty/AbstractNettyRemotingClient.java | 21 ++++---- .../netty/AbstractNettyRemotingServer.java | 5 +- .../rpc/netty/NettyClientChannelManager.java | 21 +++++--- .../server/ServerOnRequestProcessor.java | 21 ++++---- .../eureka/EurekaRegistryServiceImpl.java | 9 ++-- .../tx/api/fence/hook/TccHookManager.java | 4 +- .../engine/pcext/utils/LoopContextHolder.java | 9 +++- .../engine/pcext/utils/LoopTaskUtils.java | 9 +++- 12 files changed, 140 insertions(+), 45 deletions(-) create mode 100644 common/src/main/java/org/apache/seata/common/lock/ResourceLock.java diff --git a/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceLoader.java b/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceLoader.java index 266e6e7c173..bffa1ec62ae 100644 --- a/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceLoader.java +++ b/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceLoader.java @@ -33,6 +33,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.seata.common.Constants; import org.apache.seata.common.executor.Initialize; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; import org.slf4j.Logger; @@ -235,8 +236,10 @@ private static void doUnload(InnerEnhancedServiceLoader serviceLoader, St serviceLoader.nameToDefinitionsMap.remove(activateName.toLowerCase()); if (CollectionUtils.isNotEmpty(extensionDefinitions)) { for (ExtensionDefinition definition : extensionDefinitions) { - serviceLoader.definitionToInstanceMap.remove(definition); - + InnerEnhancedServiceLoader.Holder holder = serviceLoader.definitionToInstanceMap.remove(definition); + if (holder != null) { + serviceLoader.holderLocks.remove(holder); + } } } } @@ -289,6 +292,8 @@ private static class InnerEnhancedServiceLoader { new ConcurrentHashMap<>(); private final ConcurrentMap>> nameToDefinitionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap, ExtensionDefinition> classToDefinitionMap = new ConcurrentHashMap<>(); + private final ConcurrentMap,ResourceLock> holderLocks = new ConcurrentHashMap<>(); + private final ResourceLock resourceLock = new ResourceLock(); private InnerEnhancedServiceLoader(Class type) { this.type = type; @@ -320,6 +325,9 @@ private static InnerEnhancedServiceLoader removeServiceLoader(Class ty } private static void removeAllServiceLoader() { + SERVICE_LOADERS.values().forEach(loader -> { + loader.holderLocks.clear(); + }); SERVICE_LOADERS.clear(); } @@ -472,7 +480,8 @@ private S getExtensionInstance(ExtensionDefinition definition, ClassLoader lo key -> new Holder<>()); Object instance = holder.get(); if (instance == null) { - synchronized (holder) { + ResourceLock lock = CollectionUtils.computeIfAbsent(holderLocks, holder, key -> new ResourceLock()); + try (ResourceLock ignored = lock.obtain()) { instance = holder.get(); if (instance == null) { instance = createNewExtension(definition, loader, argTypes, args); @@ -499,7 +508,7 @@ private S createNewExtension(ExtensionDefinition definition, ClassLoader load private List> loadAllExtensionClass(ClassLoader loader, boolean includeCompatible) { List> definitions = definitionsHolder.get(); if (definitions == null) { - synchronized (definitionsHolder) { + try (ResourceLock ignored = resourceLock.obtain()) { definitions = definitionsHolder.get(); if (definitions == null) { definitions = findAllExtensionDefinition(loader, includeCompatible); diff --git a/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java b/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java new file mode 100644 index 00000000000..311d0208670 --- /dev/null +++ b/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.common.lock; + +import java.util.concurrent.locks.ReentrantLock; + +/** + * The ResourceLock extends ReentrantLock and implements AutoCloseable, + * allowing it to be used in try-with-resources blocks without needing + * to unlock in a finally block. + * + *

Example

+ *
+ * {@code
+ *   private final ResourceLock resourceLock = new ResourceLock();
+ *   try (ResourceLock lock = resourceLock.obtain()) {
+ *     // do something while holding the resource lock
+ *   }
+ * }
+ * 
+ */ +public class ResourceLock extends ReentrantLock implements AutoCloseable { + + /** + * Obtain the lock. + * + * @return this ResourceLock + */ + public ResourceLock obtain() { + lock(); + return this; + } + + + @Override + public void close() { + this.unlock(); + } +} diff --git a/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java b/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java index 542de3ed1eb..9316c3011f7 100644 --- a/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java +++ b/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java @@ -16,12 +16,15 @@ */ package org.apache.seata.common.util; +import org.apache.seata.common.lock.ResourceLock; + /** * The type Uuid generator. */ public class UUIDGenerator { private static volatile IdWorker idWorker; + private final static ResourceLock resourceLock = new ResourceLock(); /** * generate UUID using snowflake algorithm @@ -30,7 +33,7 @@ public class UUIDGenerator { */ public static long generateUUID() { if (idWorker == null) { - synchronized (UUIDGenerator.class) { + try (ResourceLock ignored = resourceLock.obtain()) { if (idWorker == null) { init(null); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java index 3d09108ac43..ea899182fa2 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java @@ -30,12 +30,15 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Condition; + import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import org.apache.seata.common.exception.FrameworkErrorCode; import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.thread.PositiveAtomicCounter; import org.apache.seata.core.protocol.MessageFuture; @@ -88,7 +91,8 @@ public abstract class AbstractNettyRemoting implements Disposable { */ protected volatile long nowMills = 0; private static final int TIMEOUT_CHECK_INTERVAL = 3000; - protected final Object lock = new Object(); + protected final ResourceLock resourceLock = new ResourceLock(); + protected final Condition condition = resourceLock.newCondition(); /** * The Is sending. */ @@ -119,7 +123,6 @@ public void run() { } } } - nowMills = System.currentTimeMillis(); } }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS); @@ -343,7 +346,7 @@ protected String getAddressFromChannel(Channel channel) { private void channelWritableCheck(Channel channel, Object msg) { int tryTimes = 0; - synchronized (lock) { + try (ResourceLock ignored = resourceLock.obtain()){ while (!channel.isWritable()) { try { tryTimes++; @@ -352,7 +355,7 @@ private void channelWritableCheck(Channel channel, Object msg) { throw new FrameworkException("msg:" + ((msg == null) ? "null" : msg.toString()), FrameworkErrorCode.ChannelIsNotWritable); } - lock.wait(NOT_WRITEABLE_CHECK_MILLS); + condition.await(NOT_WRITEABLE_CHECK_MILLS, TimeUnit.MILLISECONDS); } catch (InterruptedException exx) { LOGGER.error(exx.getMessage()); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index bbbab50faa5..c8634ef04ef 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -28,6 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Condition; import java.util.function.Function; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; @@ -39,6 +40,7 @@ import io.netty.util.concurrent.EventExecutorGroup; import org.apache.seata.common.exception.FrameworkErrorCode; import org.apache.seata.common.exception.FrameworkException; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NetUtil; @@ -82,7 +84,8 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L; private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L; private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend"; - protected final Object mergeLock = new Object(); + private final ResourceLock mergerLock = new ResourceLock(); + private final Condition mergeCondition= mergerLock.newCondition(); /** * When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap. @@ -163,8 +166,8 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { LOGGER.debug("offer message: {}", rpcMessage.getBody()); } if (!isSending) { - synchronized (mergeLock) { - mergeLock.notifyAll(); + try (ResourceLock ignored = mergerLock.obtain()){ + mergeCondition.notifyAll(); } } @@ -344,11 +347,9 @@ private class MergedSendRunnable implements Runnable { @Override public void run() { while (true) { - synchronized (mergeLock) { - try { - mergeLock.wait(MAX_MERGE_SEND_MILLS); - } catch (InterruptedException e) { - } + try (ResourceLock ignored = mergerLock.obtain()){ + mergeCondition.await(MAX_MERGE_SEND_MILLS, TimeUnit.MILLISECONDS); + } catch (InterruptedException ignored) { } isSending = true; basketMap.forEach((address, basket) -> { @@ -431,9 +432,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) { - synchronized (lock) { + try (ResourceLock ignored = resourceLock.obtain()){ if (ctx.channel().isWritable()) { - lock.notifyAll(); + condition.signalAll(); } } ctx.fireChannelWritabilityChanged(); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 67df2ea8494..7399dd8049e 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -28,6 +28,7 @@ import io.netty.handler.codec.DecoderException; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; import org.apache.seata.core.protocol.AbstractMessage; @@ -177,9 +178,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) { - synchronized (lock) { + try (ResourceLock ignored = resourceLock.obtain()){ if (ctx.channel().isWritable()) { - lock.notifyAll(); + condition.signalAll(); } } ctx.fireChannelWritabilityChanged(); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java index 7be0de2e729..bb748735a5a 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java @@ -33,6 +33,7 @@ import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.exception.FrameworkErrorCode; import org.apache.seata.common.exception.FrameworkException; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; @@ -51,7 +52,7 @@ class NettyClientChannelManager { private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientChannelManager.class); - private final ConcurrentMap channelLocks = new ConcurrentHashMap<>(); + private final ConcurrentMap resourceLocks = new ConcurrentHashMap<>(); private final ConcurrentMap poolKeyMap = new ConcurrentHashMap<>(); @@ -105,8 +106,8 @@ Channel acquireChannel(String serverAddress) { if (LOGGER.isInfoEnabled()) { LOGGER.info("will connect to {}", serverAddress); } - Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object()); - synchronized (lockObj) { + ResourceLock lock = CollectionUtils.computeIfAbsent(resourceLocks, serverAddress, key -> new ResourceLock()); + try (ResourceLock ignored = lock.obtain()){ return doConnect(serverAddress); } } @@ -118,9 +119,12 @@ Channel acquireChannel(String serverAddress) { * @param serverAddress server address */ void releaseChannel(Channel channel, String serverAddress) { - if (channel == null || serverAddress == null) { return; } - try { - synchronized (channelLocks.get(serverAddress)) { + if (channel == null || serverAddress == null) { + return; + } + ResourceLock lock = resourceLocks.get(serverAddress); + if (lock != null) { + try (ResourceLock ignored = lock.obtain()) { Channel ch = channels.get(serverAddress); if (ch == null) { nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel); @@ -134,9 +138,9 @@ void releaseChannel(Channel channel, String serverAddress) { } else { nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel); } + } catch (Exception exx) { + LOGGER.error(exx.getMessage()); } - } catch (Exception exx) { - LOGGER.error(exx.getMessage()); } } @@ -151,6 +155,7 @@ void destroyChannel(String serverAddress, Channel channel) { try { if (channel.equals(channels.get(serverAddress))) { channels.remove(serverAddress); + resourceLocks.remove(serverAddress); } nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel); } catch (Exception exx) { diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java index 5431c19e630..87e48d4ed95 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java @@ -30,10 +30,11 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.locks.Condition; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NetUtil; @@ -94,7 +95,9 @@ public class ServerOnRequestProcessor implements RemotingProcessor, Disposable { private ExecutorService batchResponseExecutorService; private final ConcurrentMap> basketMap = new ConcurrentHashMap<>(); - protected final Object batchResponseLock = new Object(); + protected final ResourceLock batchResponseLock = new ResourceLock(); + protected final Condition condition = batchResponseLock.newCondition(); + private volatile boolean isResponding = false; private static final int MAX_BATCH_RESPONSE_MILLS = 1; private static final int MAX_BATCH_RESPONSE_THREAD = 1; @@ -215,8 +218,8 @@ private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) private void notifyBatchRespondingThread() { if (!isResponding) { - synchronized (batchResponseLock) { - batchResponseLock.notifyAll(); + try (ResourceLock ignored = batchResponseLock.obtain()){ + condition.signalAll(); } } } @@ -242,12 +245,10 @@ private class BatchResponseRunnable implements Runnable { @Override public void run() { while (true) { - synchronized (batchResponseLock) { - try { - batchResponseLock.wait(MAX_BATCH_RESPONSE_MILLS); - } catch (InterruptedException e) { - LOGGER.error("BatchResponseRunnable Interrupted error", e); - } + try (ResourceLock ignored = batchResponseLock.obtain()){ + condition.await(MAX_BATCH_RESPONSE_MILLS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.error("BatchResponseRunnable Interrupted error", e); } isResponding = true; basketMap.forEach((channel, msgQueue) -> { diff --git a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java index 5ab5191234d..2938337acf6 100644 --- a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java +++ b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java @@ -26,6 +26,7 @@ import com.netflix.discovery.EurekaEventListener; import com.netflix.discovery.shared.Application; import org.apache.seata.common.exception.EurekaRegistryException; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; @@ -68,7 +69,7 @@ public class EurekaRegistryServiceImpl implements RegistryService> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>(); private static final ConcurrentMap> CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<>(); - private static final ConcurrentMap CLUSTER_LOCK = new ConcurrentHashMap<>(); + private static final ConcurrentMap CLUSTER_LOCK = new ConcurrentHashMap<>(); private static volatile ApplicationInfoManager applicationInfoManager; private static volatile CustomEurekaInstanceConfig instanceConfig; @@ -140,14 +141,16 @@ public List lookup(String key) throws Exception { } String clusterUpperName = clusterName.toUpperCase(); if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) { - Object lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, k -> new Object()); - synchronized (lock) { + try (ResourceLock lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, k -> new ResourceLock()); + ResourceLock ignored = lock.obtain()) { if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) { refreshCluster(clusterUpperName); subscribe(clusterUpperName, event -> { refreshCluster(clusterUpperName); }); } + } finally { + CLUSTER_LOCK.remove(clusterUpperName); } } return CLUSTER_ADDRESS_MAP.get(clusterUpperName); diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java index e6d537c73f2..33a1ec66d0b 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java @@ -20,11 +20,13 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.seata.common.lock.ResourceLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class TccHookManager { private static final Logger LOGGER = LoggerFactory.getLogger(TccHookManager.class); + private static final ResourceLock LOCK = new ResourceLock(); private TccHookManager() { @@ -40,7 +42,7 @@ private TccHookManager() { */ public static List getHooks() { if (CACHED_UNMODIFIABLE_HOOKS == null) { - synchronized (TccHookManager.class) { + try (ResourceLock ignored = LOCK.obtain()){ if (CACHED_UNMODIFIABLE_HOOKS == null) { CACHED_UNMODIFIABLE_HOOKS = Collections.unmodifiableList(TCC_HOOKS); } diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopContextHolder.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopContextHolder.java index 519b0a02372..39d9a54bbf3 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopContextHolder.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopContextHolder.java @@ -18,8 +18,11 @@ import java.util.Collection; import java.util.Stack; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.seata.common.lock.ResourceLock; +import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.saga.proctrl.ProcessContext; import org.apache.seata.saga.statelang.domain.DomainConstants; @@ -37,19 +40,23 @@ public class LoopContextHolder { private final Stack loopCounterStack = new Stack<>(); private final Stack forwardCounterStack = new Stack<>(); private Collection collection; + private static final ConcurrentHashMap CONTEXT_LOCK_MAP = new ConcurrentHashMap<>(); public static LoopContextHolder getCurrent(ProcessContext context, boolean forceCreate) { LoopContextHolder loopContextHolder = (LoopContextHolder)context.getVariable( DomainConstants.VAR_NAME_CURRENT_LOOP_CONTEXT_HOLDER); if (null == loopContextHolder && forceCreate) { - synchronized (context) { + try (ResourceLock lock = CollectionUtils.computeIfAbsent(CONTEXT_LOCK_MAP, context, k -> new ResourceLock()); + ResourceLock ignored = lock.obtain()) { loopContextHolder = (LoopContextHolder)context.getVariable( DomainConstants.VAR_NAME_CURRENT_LOOP_CONTEXT_HOLDER); if (null == loopContextHolder) { loopContextHolder = new LoopContextHolder(); context.setVariable(DomainConstants.VAR_NAME_CURRENT_LOOP_CONTEXT_HOLDER, loopContextHolder); } + } finally { + CONTEXT_LOCK_MAP.remove(context); } } return loopContextHolder; diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopTaskUtils.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopTaskUtils.java index e71818f1f1a..ddf4dce187b 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopTaskUtils.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopTaskUtils.java @@ -22,9 +22,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.apache.seata.common.exception.FrameworkErrorCode; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NumberUtils; import org.apache.seata.common.util.StringUtils; @@ -56,6 +58,8 @@ public class LoopTaskUtils { public static final String LOOP_STATE_NAME_PATTERN = "-loop-"; + private static final ConcurrentHashMap HOLDER_LOCK_MAP = new ConcurrentHashMap<>(); + /** * get Loop Config from State * @@ -224,7 +228,8 @@ public static boolean isCompletionConditionSatisfied(ProcessContext context) { int nrOfCompletedInstances = currentLoopContext.getNrOfCompletedInstances().get(); if (!currentLoopContext.isCompletionConditionSatisfied()) { - synchronized (currentLoopContext) { + try (ResourceLock lock = CollectionUtils.computeIfAbsent(HOLDER_LOCK_MAP, currentLoopContext, k -> new ResourceLock()); + ResourceLock ignored = lock.obtain()) { if (!currentLoopContext.isCompletionConditionSatisfied()) { Map stateMachineContext = (Map)context.getVariable( DomainConstants.VAR_NAME_STATEMACHINE_CONTEXT); @@ -242,6 +247,8 @@ public static boolean isCompletionConditionSatisfied(ProcessContext context) { currentLoopContext.setCompletionConditionSatisfied(true); } } + } finally { + HOLDER_LOCK_MAP.remove(currentLoopContext); } } From 34d146f41fdd22f5019386ca2c2db07cea301b07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=AA=E5=BF=A0=E7=A5=A5?= Date: Tue, 12 Nov 2024 17:55:49 +0800 Subject: [PATCH 02/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../SkipCallbackWrapperException.java | 11 +- .../EnhancedServiceNotFoundException.java | 8 +- .../core/rpc/netty/RmNettyRemotingClient.java | 4 +- .../core/rpc/netty/TmNettyRemotingClient.java | 4 +- .../parser/DefaultRemotingParser.java | 6 +- .../seata/rm/datasource/util/JdbcUtils.java | 4 +- .../rm/datasource/xa/ConnectionProxyXA.java | 179 ++++++++++-------- .../rm/datasource/xa/ResourceManagerXA.java | 19 +- .../rocketmq/SeataMQProducerFactory.java | 5 +- .../pcext/handlers/ChoiceStateHandler.java | 10 +- .../ServiceTaskHandlerInterceptor.java | 7 +- .../pcext/utils/CompensationHolder.java | 9 +- .../saga/engine/pcext/utils/EngineUtils.java | 7 +- .../engine/pcext/utils/ParameterUtils.java | 12 +- .../repo/impl/StateMachineRepositoryImpl.java | 12 +- .../proctrl/eventing/impl/DirectEventBus.java | 7 +- .../impl/SpringBeanServiceInvoker.java | 12 +- 17 files changed, 202 insertions(+), 114 deletions(-) diff --git a/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java b/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java index 5282954324a..55cc7b3732a 100644 --- a/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java +++ b/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java @@ -16,6 +16,8 @@ */ package org.apache.seata.common.exception; +import org.apache.seata.common.lock.ResourceLock; + /** * Skip Callback Wrapper Exception. * This exception class will make the semantics clearer. @@ -23,14 +25,17 @@ * @since above 1.4.2 */ public class SkipCallbackWrapperException extends RuntimeException { + private final ResourceLock resourceLock = new ResourceLock(); public SkipCallbackWrapperException(Throwable cause) { super(cause); } @Override - public synchronized Throwable fillInStackTrace() { - // do nothing - return null; + public Throwable fillInStackTrace() { + try (ResourceLock ignored = resourceLock.obtain()) { + // do nothing + return null; + } } } diff --git a/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java b/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java index 88bc211d5d3..887ef88e283 100644 --- a/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java +++ b/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java @@ -17,6 +17,7 @@ package org.apache.seata.common.loader; import org.apache.commons.lang.exception.NestableRuntimeException; +import org.apache.seata.common.lock.ResourceLock; /** * The type Enhanced service not found exception. @@ -24,6 +25,7 @@ */ public class EnhancedServiceNotFoundException extends NestableRuntimeException { private static final long serialVersionUID = 7748438218914409019L; + private final ResourceLock resourceLock = new ResourceLock(); /** * Instantiates a new Enhanced service not found exception. @@ -75,7 +77,9 @@ public EnhancedServiceNotFoundException(Throwable cause) { } @Override - public synchronized Throwable fillInStackTrace() { - return this; + public Throwable fillInStackTrace() { + try (ResourceLock ignored = resourceLock.obtain()) { + return this; + } } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java index 0b9fd0e12bc..d1962baa082 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java @@ -29,6 +29,7 @@ import org.apache.seata.common.DefaultValues; import org.apache.seata.common.exception.FrameworkErrorCode; import org.apache.seata.common.exception.FrameworkException; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.CachedConfigurationChangeListener; @@ -66,6 +67,7 @@ public final class RmNettyRemotingClient extends AbstractNettyRemotingClient { private final AtomicBoolean initialized = new AtomicBoolean(false); private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE; private static final int MAX_QUEUE_SIZE = 20000; + private static final ResourceLock RESOURCE_LOCK = new ResourceLock(); private String applicationId; private String transactionServiceGroup; @@ -128,7 +130,7 @@ public static RmNettyRemotingClient getInstance(String applicationId, String tra */ public static RmNettyRemotingClient getInstance() { if (instance == null) { - synchronized (RmNettyRemotingClient.class) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (instance == null) { NettyClientConfig nettyClientConfig = new NettyClientConfig(); final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor( diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java index 28993b61f77..1bfb7829980 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java @@ -28,6 +28,7 @@ import org.apache.seata.common.DefaultValues; import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.thread.RejectedPolicies; import org.apache.seata.common.util.NetUtil; @@ -57,6 +58,7 @@ public final class TmNettyRemotingClient extends AbstractNettyRemotingClient { private static volatile TmNettyRemotingClient instance; private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE; private static final int MAX_QUEUE_SIZE = 2000; + private static final ResourceLock RESOURCE_LOCK = new ResourceLock(); private final AtomicBoolean initialized = new AtomicBoolean(false); private String applicationId; private String transactionServiceGroup; @@ -122,7 +124,7 @@ public static TmNettyRemotingClient getInstance(String applicationId, String tra */ public static TmNettyRemotingClient getInstance() { if (instance == null) { - synchronized (TmNettyRemotingClient.class) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (instance == null) { NettyClientConfig nettyClientConfig = new NettyClientConfig(); final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor( diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java index 0ed9625e616..0c4ba0de936 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java @@ -23,6 +23,7 @@ import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.integration.tx.api.remoting.RemotingDesc; import org.apache.seata.integration.tx.api.remoting.RemotingParser; @@ -37,12 +38,13 @@ public class DefaultRemotingParser { * all remoting bean parser */ protected static List allRemotingParsers = new ArrayList<>(); - /** * all remoting beans beanName -> RemotingDesc */ protected static Map remotingServiceMap = new ConcurrentHashMap<>(); + private final ResourceLock resourceLock = new ResourceLock(); + private static class SingletonHolder { private static final DefaultRemotingParser INSTANCE = new DefaultRemotingParser(); } @@ -79,7 +81,7 @@ protected void initRemotingParser() { * @param remotingParser */ public boolean registerRemotingParser(RemotingParser remotingParser) { - synchronized (this) { + try (ResourceLock ignored = resourceLock.obtain()) { return allRemotingParsers.add(remotingParser); } } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java index 2e5c8b1f336..1ee4c1ef63c 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java @@ -17,6 +17,7 @@ package org.apache.seata.rm.datasource.util; import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.rm.BaseDataSourceResource; import org.apache.seata.rm.DefaultResourceManager; import org.apache.seata.sqlparser.SqlParserType; @@ -33,10 +34,11 @@ public final class JdbcUtils { private static volatile DbTypeParser dbTypeParser; + private final static ResourceLock RESOURCE_LOCK = new ResourceLock(); static DbTypeParser getDbTypeParser() { if (dbTypeParser == null) { - synchronized (JdbcUtils.class) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (dbTypeParser == null) { dbTypeParser = EnhancedServiceLoader.load(DbTypeParser.class, SqlParserType.SQL_PARSER_TYPE_DRUID); } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java index 6d35dfd6301..200b3b8c032 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java @@ -24,6 +24,7 @@ import javax.transaction.xa.XAResource; import org.apache.seata.common.DefaultValues; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.exception.TransactionException; @@ -69,6 +70,8 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold private boolean shouldBeHeld = false; + private final static ResourceLock RESOURCE_LOCK = new ResourceLock(); + /** * Constructor of Connection Proxy for XA mode. * @@ -127,10 +130,12 @@ private void xaEnd(XAXid xaXid, int flags) throws XAException { * @param applicationData application data * @throws SQLException SQLException */ - public synchronized void xaCommit(String xid, long branchId, String applicationData) throws XAException { - XAXid xaXid = XAXidBuilder.build(xid, branchId); - xaResource.commit(xaXid, false); - releaseIfNecessary(); + public void xaCommit(String xid, long branchId, String applicationData) throws XAException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + XAXid xaXid = XAXidBuilder.build(xid, branchId); + xaResource.commit(xaXid, false); + releaseIfNecessary(); + } } /** @@ -139,12 +144,14 @@ public synchronized void xaCommit(String xid, long branchId, String applicationD * @param branchId transaction branch id * @param applicationData application data */ - public synchronized void xaRollback(String xid, long branchId, String applicationData) throws XAException { - if (this.xaBranchXid != null) { - xaRollback(xaBranchXid); - } else { - XAXid xaXid = XAXidBuilder.build(xid, branchId); - xaRollback(xaXid); + public void xaRollback(String xid, long branchId, String applicationData) throws XAException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + if (this.xaBranchXid != null) { + xaRollback(xaBranchXid); + } else { + XAXid xaXid = XAXidBuilder.build(xid, branchId); + xaRollback(xaXid); + } } } @@ -214,43 +221,45 @@ public boolean getAutoCommit() throws SQLException { } @Override - public synchronized void commit() throws SQLException { - if (currentAutoCommitStatus || isReadOnly()) { - // Ignore the committing on an autocommit session and read-only transaction. - return; - } - if (!xaActive || this.xaBranchXid == null) { - throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); - } - try { - // XA End: Success - try { - end(XAResource.TMSUCCESS); - } catch (SQLException sqle) { - // Rollback immediately before the XA Branch Context is deleted. - String xaBranchXid = this.xaBranchXid.toString(); - rollback(); - throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle); + public void commit() throws SQLException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + if (currentAutoCommitStatus || isReadOnly()) { + // Ignore the committing on an autocommit session and read-only transaction. + return; } - long now = System.currentTimeMillis(); - checkTimeout(now); - setPrepareTime(now); - int prepare = xaResource.prepare(xaBranchXid); - // Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022), - // only Oracle has read-only optimization; the others do not provide read-only feedback. - // Therefore, the database type check can be eliminated here. - if (prepare == XAResource.XA_RDONLY) { - // Branch Report to TC: RDONLY - reportStatusToTC(BranchStatus.PhaseOne_RDONLY); + if (!xaActive || this.xaBranchXid == null) { + throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); + } + try { + // XA End: Success + try { + end(XAResource.TMSUCCESS); + } catch (SQLException sqle) { + // Rollback immediately before the XA Branch Context is deleted. + String xaBranchXid = this.xaBranchXid.toString(); + rollback(); + throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle); + } + long now = System.currentTimeMillis(); + checkTimeout(now); + setPrepareTime(now); + int prepare = xaResource.prepare(xaBranchXid); + // Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022), + // only Oracle has read-only optimization; the others do not provide read-only feedback. + // Therefore, the database type check can be eliminated here. + if (prepare == XAResource.XA_RDONLY) { + // Branch Report to TC: RDONLY + reportStatusToTC(BranchStatus.PhaseOne_RDONLY); + } + } catch (XAException xe) { + // Branch Report to TC: Failed + reportStatusToTC(BranchStatus.PhaseOne_Failed); + throw new SQLException( + "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe + .getMessage(), xe); + } finally { + cleanXABranchContext(); } - } catch (XAException xe) { - // Branch Report to TC: Failed - reportStatusToTC(BranchStatus.PhaseOne_Failed); - throw new SQLException( - "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe - .getMessage(), xe); - } finally { - cleanXABranchContext(); } } @@ -280,29 +289,33 @@ public void rollback() throws SQLException { } } - private synchronized void start() throws XAException, SQLException { - // 3. XA Start - if (JdbcConstants.ORACLE.equals(resource.getDbType())) { - xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE); - } else { - xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); - } + private void start() throws XAException, SQLException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + // 3. XA Start + if (JdbcConstants.ORACLE.equals(resource.getDbType())) { + xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE); + } else { + xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); + } - try { - termination(); - } catch (SQLException e) { - // the framework layer does not actively call ROLLBACK when setAutoCommit throws an SQL exception - xaResource.end(this.xaBranchXid, XAResource.TMFAIL); - xaRollback(xaBranchXid); - // Branch Report to TC: Failed - reportStatusToTC(BranchStatus.PhaseOne_Failed); - throw e; + try { + termination(); + } catch (SQLException e) { + // the framework layer does not actively call ROLLBACK when setAutoCommit throws an SQL exception + xaResource.end(this.xaBranchXid, XAResource.TMFAIL); + xaRollback(xaBranchXid); + // Branch Report to TC: Failed + reportStatusToTC(BranchStatus.PhaseOne_Failed); + throw e; + } } } - private synchronized void end(int flags) throws XAException, SQLException { - xaEnd(xaBranchXid, flags); - termination(); + private void end(int flags) throws XAException, SQLException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + xaEnd(xaBranchXid, flags); + termination(); + } } private void cleanXABranchContext() { @@ -323,27 +336,31 @@ private void checkTimeout(Long now) throws XAException { } @Override - public synchronized void close() throws SQLException { - rollBacked = false; - if (isHeld() && shouldBeHeld()) { - // if kept by a keeper, just hold the connection. - return; + public void close() throws SQLException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + rollBacked = false; + if (isHeld() && shouldBeHeld()) { + // if kept by a keeper, just hold the connection. + return; + } + cleanXABranchContext(); + originalConnection.close(); } - cleanXABranchContext(); - originalConnection.close(); } - protected synchronized void closeForce() throws SQLException { - Connection physicalConn = getWrappedConnection(); - if (physicalConn instanceof PooledConnection) { - physicalConn = ((PooledConnection) physicalConn).getConnection(); + protected void closeForce() throws SQLException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + Connection physicalConn = getWrappedConnection(); + if (physicalConn instanceof PooledConnection) { + physicalConn = ((PooledConnection) physicalConn).getConnection(); + } + // Force close the physical connection + physicalConn.close(); + rollBacked = false; + cleanXABranchContext(); + originalConnection.close(); + releaseIfNecessary(); } - // Force close the physical connection - physicalConn.close(); - rollBacked = false; - cleanXABranchContext(); - originalConnection.close(); - releaseIfNecessary(); } @Override diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java index 8added9ff64..2755b3c4b79 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java @@ -17,13 +17,16 @@ package org.apache.seata.rm.datasource.xa; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.sql.SQLException; import javax.transaction.xa.XAException; import org.apache.seata.common.DefaultValues; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.model.BranchStatus; @@ -49,6 +52,8 @@ public class ResourceManagerXA extends AbstractDataSourceCacheResourceManager { private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L; private static final long SCHEDULE_INTERVAL_MILLS = 1000L; + private final ResourceLock resourceLock = new ResourceLock(); + private final ConcurrentHashMap connectionLocks = new ConcurrentHashMap<>(); /** * The Timer check xa branch two phase hold timeout. */ @@ -61,35 +66,37 @@ public void init() { public void initXaTwoPhaseTimeoutChecker() { if (xaTwoPhaseTimeoutChecker == null) { - synchronized (this) { + try (ResourceLock ignored = resourceLock.obtain()) { if (xaTwoPhaseTimeoutChecker == null) { boolean shouldBeHold = dataSourceCache.values().parallelStream().anyMatch(resource -> { if (resource instanceof DataSourceProxyXA) { - return ((DataSourceProxyXA)resource).isShouldBeHeld(); + return ((DataSourceProxyXA) resource).isShouldBeHeld(); } return false; }); if (shouldBeHold) { xaTwoPhaseTimeoutChecker = new ScheduledThreadPoolExecutor(1, - new NamedThreadFactory("xaTwoPhaseTimeoutChecker", 1, true)); + new NamedThreadFactory("xaTwoPhaseTimeoutChecker", 1, true)); xaTwoPhaseTimeoutChecker.scheduleAtFixedRate(() -> { for (Map.Entry entry : dataSourceCache.entrySet()) { - BaseDataSourceResource resource = (BaseDataSourceResource)entry.getValue(); + BaseDataSourceResource resource = (BaseDataSourceResource) entry.getValue(); if (resource.isShouldBeHeld()) { if (resource instanceof DataSourceProxyXA) { Map keeper = resource.getKeeper(); for (Map.Entry connectionEntry : keeper.entrySet()) { ConnectionProxyXA connection = connectionEntry.getValue(); long now = System.currentTimeMillis(); - synchronized (connection) { + try (ResourceLock ignored2 = CollectionUtils.computeIfAbsent(connectionLocks, connection, key -> new ResourceLock()).obtain()) { if (connection.getPrepareTime() != null - && now - connection.getPrepareTime() > TWO_PHASE_HOLD_TIMEOUT) { + && now - connection.getPrepareTime() > TWO_PHASE_HOLD_TIMEOUT) { try { connection.closeForce(); } catch (SQLException e) { LOGGER.warn("Force close the xa physical connection fail", e); } } + } finally { + connectionLocks.remove(connection); } } } diff --git a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java index 63414aa1290..c18e6f758a0 100644 --- a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java +++ b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java @@ -19,6 +19,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.remoting.RPCHook; import org.apache.seata.common.exception.NotSupportYetException; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.core.model.BranchType; import org.apache.seata.integration.tx.api.util.ProxyUtil; @@ -29,7 +30,7 @@ public class SeataMQProducerFactory { public static final String ROCKET_TCC_NAME = "tccRocketMQ"; public static final BranchType ROCKET_BRANCH_TYPE = BranchType.TCC; - + private static final ResourceLock RESOURCE_LOCK = new ResourceLock(); /** * Default Producer, it can be replaced to Map after multi-resource is supported */ @@ -42,7 +43,7 @@ public static SeataMQProducer createSingle(String nameServer, String producerGro public static SeataMQProducer createSingle(String nameServer, String namespace, String groupName, RPCHook rpcHook) throws MQClientException { if (defaultProducer == null) { - synchronized (SeataMQProducerFactory.class) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (defaultProducer == null) { defaultProducer = new SeataMQProducer(namespace, groupName, rpcHook); defaultProducer.setNamesrvAddr(nameServer); diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/handlers/ChoiceStateHandler.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/handlers/ChoiceStateHandler.java index ee6fac0c137..8523186b2e8 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/handlers/ChoiceStateHandler.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/handlers/ChoiceStateHandler.java @@ -19,8 +19,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.seata.common.exception.FrameworkErrorCode; +import org.apache.seata.common.lock.ResourceLock; +import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; import org.apache.seata.saga.engine.StateMachineConfig; import org.apache.seata.saga.engine.exception.EngineExecutionException; @@ -41,19 +44,18 @@ * */ public class ChoiceStateHandler implements StateHandler { + private final ConcurrentHashMap choiceStateLocks = new ConcurrentHashMap<>(); @Override public void process(ProcessContext context) throws EngineExecutionException { - StateInstruction instruction = context.getInstruction(StateInstruction.class); ChoiceStateImpl choiceState = (ChoiceStateImpl)instruction.getState(context); Map choiceEvaluators = choiceState.getChoiceEvaluators(); if (choiceEvaluators == null) { - synchronized (choiceState) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(choiceStateLocks, choiceState, k -> new ResourceLock()).obtain()) { choiceEvaluators = choiceState.getChoiceEvaluators(); if (choiceEvaluators == null) { - List choices = choiceState.getChoices(); if (choices == null) { choiceEvaluators = new LinkedHashMap<>(0); @@ -68,6 +70,8 @@ public void process(ProcessContext context) throws EngineExecutionException { } choiceState.setChoiceEvaluators(choiceEvaluators); } + } finally { + choiceStateLocks.remove(choiceState); } } diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java index 1cee6d2ccf4..a2b7f394513 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java @@ -20,9 +20,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.seata.common.exception.FrameworkErrorCode; import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; import org.apache.seata.saga.engine.StateMachineConfig; @@ -60,6 +62,7 @@ public class ServiceTaskHandlerInterceptor implements StateHandlerInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceTaskHandlerInterceptor.class); + private final ConcurrentHashMap taskStateLocks = new ConcurrentHashMap<>(); @Override public boolean match(Class clazz) { @@ -312,7 +315,7 @@ private void decideExecutionStatus(ProcessContext context, StateInstance stateIn Map statusEvaluators = state.getStatusEvaluators(); if (statusEvaluators == null) { - synchronized (state) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(taskStateLocks, state, k -> new ResourceLock()).obtain()) { statusEvaluators = state.getStatusEvaluators(); if (statusEvaluators == null) { statusEvaluators = new LinkedHashMap<>(statusMatchList.size()); @@ -328,6 +331,8 @@ private void decideExecutionStatus(ProcessContext context, StateInstance stateIn } } state.setStatusEvaluators(statusEvaluators); + } finally { + taskStateLocks.remove(state); } } diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/CompensationHolder.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/CompensationHolder.java index d7d19d42a4f..d6df01fcd22 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/CompensationHolder.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/CompensationHolder.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.seata.common.exception.FrameworkErrorCode; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; import org.apache.seata.saga.engine.exception.EngineExecutionException; @@ -59,19 +60,21 @@ public class CompensationHolder { */ private Stack stateStackNeedCompensation = new Stack<>(); - public static CompensationHolder getCurrent(ProcessContext context, boolean forceCreate) { + private static final ConcurrentHashMap contextLocks = new ConcurrentHashMap<>(); + public static CompensationHolder getCurrent(ProcessContext context, boolean forceCreate) { CompensationHolder compensationholder = (CompensationHolder)context.getVariable( DomainConstants.VAR_NAME_CURRENT_COMPENSATION_HOLDER); if (compensationholder == null && forceCreate) { - synchronized (context) { - + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(contextLocks, context, k -> new ResourceLock()).obtain()) { compensationholder = (CompensationHolder)context.getVariable( DomainConstants.VAR_NAME_CURRENT_COMPENSATION_HOLDER); if (compensationholder == null) { compensationholder = new CompensationHolder(); context.setVariable(DomainConstants.VAR_NAME_CURRENT_COMPENSATION_HOLDER, compensationholder); } + } finally { + contextLocks.remove(context); } } return compensationholder; diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/EngineUtils.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/EngineUtils.java index 9d906aaf820..e44f18dd8bc 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/EngineUtils.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/EngineUtils.java @@ -20,8 +20,10 @@ import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; import org.apache.seata.saga.engine.AsyncCallback; @@ -44,6 +46,7 @@ public class EngineUtils { private static final Logger LOGGER = LoggerFactory.getLogger(EngineUtils.class); + private static final ConcurrentHashMap exceptionMatchLocks = new ConcurrentHashMap<>(); /** * generate parent id @@ -198,7 +201,7 @@ public static void handleException(ProcessContext context, AbstractTaskState sta List> exceptionClasses = exceptionMatch.getExceptionClasses(); if (CollectionUtils.isNotEmpty(exceptions)) { if (exceptionClasses == null) { - synchronized (exceptionMatch) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(exceptionMatchLocks, exceptionMatch, k -> new ResourceLock()).obtain()) { exceptionClasses = exceptionMatch.getExceptionClasses(); if (exceptionClasses == null) { @@ -230,6 +233,8 @@ public static void handleException(ProcessContext context, AbstractTaskState sta } exceptionMatch.setExceptionClasses(exceptionClasses); } + } finally { + exceptionMatchLocks.remove(exceptionMatch); } } diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/ParameterUtils.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/ParameterUtils.java index 92356024e5a..1dbbc55462b 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/ParameterUtils.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/ParameterUtils.java @@ -16,6 +16,7 @@ */ package org.apache.seata.saga.engine.pcext.utils; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; import org.apache.seata.saga.engine.expression.Expression; @@ -29,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * @@ -36,6 +38,8 @@ * */ public class ParameterUtils { + private static final ConcurrentHashMap inTaskStateLocks = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap outTaskStateLocks = new ConcurrentHashMap<>(); public static List createInputParams(ExpressionResolver expressionResolver, StateInstanceImpl stateInstance, @@ -47,7 +51,7 @@ public static List createInputParams(ExpressionResolver expressionResolv List inputExpressions = serviceTaskState.getInputExpressions(); if (inputExpressions == null) { - synchronized (serviceTaskState) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(inTaskStateLocks, serviceTaskState, k -> new ResourceLock()).obtain()) { inputExpressions = serviceTaskState.getInputExpressions(); if (inputExpressions == null) { inputExpressions = new ArrayList<>(inputAssignments.size()); @@ -56,6 +60,8 @@ public static List createInputParams(ExpressionResolver expressionResolv } } serviceTaskState.setInputExpressions(inputExpressions); + } finally { + inTaskStateLocks.remove(serviceTaskState); } } List inputValues = new ArrayList<>(inputExpressions.size()); @@ -76,7 +82,7 @@ public static Map createOutputParams(ExpressionResolver expressi Map outputExpressions = serviceTaskState.getOutputExpressions(); if (outputExpressions == null) { - synchronized (serviceTaskState) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(outTaskStateLocks, serviceTaskState, k -> new ResourceLock()).obtain()) { outputExpressions = serviceTaskState.getOutputExpressions(); if (outputExpressions == null) { outputExpressions = new LinkedHashMap<>(outputAssignments.size()); @@ -86,6 +92,8 @@ public static Map createOutputParams(ExpressionResolver expressi } } serviceTaskState.setOutputExpressions(outputExpressions); + } finally { + outTaskStateLocks.remove(serviceTaskState); } } Map outputValues = new LinkedHashMap<>(outputExpressions.size()); diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/repo/impl/StateMachineRepositoryImpl.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/repo/impl/StateMachineRepositoryImpl.java index 0def5110ecf..4c9d9aa1232 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/repo/impl/StateMachineRepositoryImpl.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/repo/impl/StateMachineRepositoryImpl.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; import org.apache.seata.saga.engine.repo.StateMachineRepository; @@ -45,7 +46,10 @@ public class StateMachineRepositoryImpl implements StateMachineRepository { private static final Logger LOGGER = LoggerFactory.getLogger(StateMachineRepositoryImpl.class); private Map stateMachineMapByNameAndTenant = new ConcurrentHashMap<>(); + private final ConcurrentHashMap byNameAndTenantLocks = new ConcurrentHashMap<>(); private Map stateMachineMapById = new ConcurrentHashMap<>(); + private final ConcurrentHashMap byIdLocks = new ConcurrentHashMap<>(); + private StateLangStore stateLangStore; private SeqGenerator seqGenerator = new UUIDSeqGenerator(); private String charset = "UTF-8"; @@ -57,7 +61,7 @@ public StateMachine getStateMachineById(String stateMachineId) { Item item = CollectionUtils.computeIfAbsent(stateMachineMapById, stateMachineId, key -> new Item()); if (item.getValue() == null && stateLangStore != null) { - synchronized (item) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(byIdLocks, item, k -> new ResourceLock()).obtain()) { if (item.getValue() == null) { StateMachine stateMachine = stateLangStore.getStateMachineById(stateMachineId); if (stateMachine != null) { @@ -75,6 +79,8 @@ public StateMachine getStateMachineById(String stateMachineId) { item); } } + } finally { + byIdLocks.remove(item); } } return item.getValue(); @@ -85,7 +91,7 @@ public StateMachine getStateMachine(String stateMachineName, String tenantId) { Item item = CollectionUtils.computeIfAbsent(stateMachineMapByNameAndTenant, stateMachineName + "_" + tenantId, key -> new Item()); if (item.getValue() == null && stateLangStore != null) { - synchronized (item) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(byNameAndTenantLocks, item, k -> new ResourceLock()).obtain()) { if (item.getValue() == null) { StateMachine stateMachine = stateLangStore.getLastVersionStateMachine(stateMachineName, tenantId); if (stateMachine != null) { @@ -103,6 +109,8 @@ public StateMachine getStateMachine(String stateMachineName, String tenantId) { } } + } finally { + byNameAndTenantLocks.remove(item); } } return item.getValue(); diff --git a/saga/seata-saga-processctrl/src/main/java/org/apache/seata/saga/proctrl/eventing/impl/DirectEventBus.java b/saga/seata-saga-processctrl/src/main/java/org/apache/seata/saga/proctrl/eventing/impl/DirectEventBus.java index 2ff2fb50bbc..10be41e415c 100644 --- a/saga/seata-saga-processctrl/src/main/java/org/apache/seata/saga/proctrl/eventing/impl/DirectEventBus.java +++ b/saga/seata-saga-processctrl/src/main/java/org/apache/seata/saga/proctrl/eventing/impl/DirectEventBus.java @@ -18,8 +18,10 @@ import java.util.List; import java.util.Stack; +import java.util.concurrent.ConcurrentHashMap; import org.apache.seata.common.exception.FrameworkException; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.saga.proctrl.ProcessContext; import org.apache.seata.saga.proctrl.eventing.EventConsumer; @@ -35,6 +37,7 @@ public class DirectEventBus extends AbstractEventBus { private static final Logger LOGGER = LoggerFactory.getLogger(DirectEventBus.class); private static final String VAR_NAME_SYNC_EXE_STACK = "_sync_execution_stack_"; + private final ConcurrentHashMap contextLocks = new ConcurrentHashMap<>(); @Override public boolean offer(ProcessContext context) throws FrameworkException { @@ -49,13 +52,15 @@ public boolean offer(ProcessContext context) throws FrameworkException { boolean isFirstEvent = false; Stack currentStack = (Stack)context.getVariable(VAR_NAME_SYNC_EXE_STACK); if (currentStack == null) { - synchronized (context) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(contextLocks, context, k -> new ResourceLock()).obtain()) { currentStack = (Stack)context.getVariable(VAR_NAME_SYNC_EXE_STACK); if (currentStack == null) { currentStack = new Stack<>(); context.setVariable(VAR_NAME_SYNC_EXE_STACK, currentStack); isFirstEvent = true; } + } finally { + contextLocks.remove(context); } } diff --git a/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/invoker/impl/SpringBeanServiceInvoker.java b/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/invoker/impl/SpringBeanServiceInvoker.java index d97f9c3972d..3f919714ad5 100644 --- a/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/invoker/impl/SpringBeanServiceInvoker.java +++ b/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/invoker/impl/SpringBeanServiceInvoker.java @@ -25,10 +25,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import org.apache.seata.common.exception.FrameworkErrorCode; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.saga.engine.exception.EngineExecutionException; import org.apache.seata.saga.engine.invoker.ServiceInvoker; @@ -56,6 +58,8 @@ public class SpringBeanServiceInvoker implements ServiceInvoker, ApplicationCont private ApplicationContext applicationContext; private ThreadPoolExecutor threadPoolExecutor; private String sagaJsonParser; + private final ConcurrentHashMap taskStateLocks = new ConcurrentHashMap<>(); + private final ConcurrentHashMap retryLocks = new ConcurrentHashMap<>(); @Override public Object invoke(ServiceTaskState serviceTaskState, Object... input) throws Throwable { @@ -97,7 +101,7 @@ protected Object doInvoke(ServiceTaskStateImpl state, Object[] input) throws Thr Method method = state.getMethod(); if (method == null) { - synchronized (state) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(taskStateLocks, state, k -> new ResourceLock()).obtain()) { method = state.getMethod(); if (method == null) { method = findMethod(bean.getClass(), state.getServiceMethod(), state.getParameterTypes()); @@ -105,6 +109,8 @@ protected Object doInvoke(ServiceTaskStateImpl state, Object[] input) throws Thr state.setMethod(method); } } + } finally { + taskStateLocks.remove(state); } } @@ -182,7 +188,7 @@ private Retry matchRetryConfig(List retryList, Throwable e) { } else { List> exceptionClasses = retryConfig.getExceptionClasses(); if (exceptionClasses == null) { - synchronized (retryConfig) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(retryLocks, retryConfig, k -> new ResourceLock()).obtain()) { exceptionClasses = retryConfig.getExceptionClasses(); if (exceptionClasses == null) { @@ -214,6 +220,8 @@ private Retry matchRetryConfig(List retryList, Throwable e) { } retryConfig.setExceptionClasses(exceptionClasses); } + } finally { + retryLocks.remove(retryConfig); } } From 916262f1fe35d1e77ba2af1cf08e9eb4e59c6a5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=AA=E5=BF=A0=E7=A5=A5?= Date: Sat, 16 Nov 2024 21:21:00 +0800 Subject: [PATCH 03/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../integration/tx/api/util/ProxyUtil.java | 38 +++++++++---------- .../seata/server/session/GlobalSession.java | 9 +++-- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java index 964840c993a..af7f17c2768 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java @@ -16,6 +16,7 @@ */ package org.apache.seata.integration.tx.api.util; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.integration.tx.api.interceptor.handler.DefaultInvocationHandler; import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler; import org.apache.seata.integration.tx.api.interceptor.parser.DefaultInterfaceParser; @@ -31,6 +32,7 @@ public class ProxyUtil { private static final Map PROXYED_SET = new HashMap<>(); + private static final ResourceLock RESOURCE_LOCK = new ResourceLock(); public static T createProxy(T target) { return createProxy(target, target.getClass().getName()); @@ -52,26 +54,24 @@ public static T createProxy(T target) { * @param the generics class */ public static T createProxy(T target, String beanName) { - try { - synchronized (PROXYED_SET) { - if (PROXYED_SET.containsKey(target)) { - return (T) PROXYED_SET.get(target); - } - ProxyInvocationHandler proxyInvocationHandler = DefaultInterfaceParser.get().parserInterfaceToProxy(target, beanName); - if (proxyInvocationHandler == null) { - return target; - } - T proxy = (T) new ByteBuddy().subclass(target.getClass()) - .method(isDeclaredBy(target.getClass())) - .intercept(InvocationHandlerAdapter.of(new DefaultInvocationHandler(proxyInvocationHandler, target))) - .make() - .load(target.getClass().getClassLoader()) - .getLoaded() - .getDeclaredConstructor() - .newInstance(); - PROXYED_SET.put(target, proxy); - return proxy; + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + if (PROXYED_SET.containsKey(target)) { + return (T) PROXYED_SET.get(target); } + ProxyInvocationHandler proxyInvocationHandler = DefaultInterfaceParser.get().parserInterfaceToProxy(target, beanName); + if (proxyInvocationHandler == null) { + return target; + } + T proxy = (T) new ByteBuddy().subclass(target.getClass()) + .method(isDeclaredBy(target.getClass())) + .intercept(InvocationHandlerAdapter.of(new DefaultInvocationHandler(proxyInvocationHandler, target))) + .make() + .load(target.getClass().getClassLoader()) + .getLoaded() + .getDeclaredConstructor() + .newInstance(); + PROXYED_SET.put(target, proxy); + return proxy; } catch (Throwable t) { throw new RuntimeException("error occurs when create seata proxy", t); } diff --git a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java index 8cfc0ecbc67..0dbb9df0bb7 100644 --- a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java +++ b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java @@ -31,6 +31,7 @@ import org.apache.seata.common.Constants; import org.apache.seata.common.DefaultValues; import org.apache.seata.common.XID; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.BufferUtils; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.ConfigurationFactory; @@ -107,6 +108,8 @@ public class GlobalSession implements SessionLifecycle, SessionStorable { private Set lifecycleListeners = new HashSet<>(2); + private final ResourceLock RESOURCE_LOCK = new ResourceLock(); + /** * Add boolean. * @@ -129,7 +132,7 @@ public boolean add(BranchSession branchSession) { * @return the boolean */ public boolean remove(BranchSession branchSession) { - synchronized (this) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { return branchSessions.remove(branchSession); } } @@ -328,7 +331,7 @@ public void addBranch(BranchSession branchSession) throws TransactionException { public void loadBranchs() { if (branchSessions == null && isLazyLoadBranch()) { - synchronized (this) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (branchSessions == null && isLazyLoadBranch()) { branchSessions = new ArrayList<>(); Optional.ofNullable(SessionHolder.getRootSessionManager().findGlobalSession(xid, true)) @@ -376,7 +379,7 @@ public void removeAndUnlockBranch(BranchSession branchSession) throws Transactio * @return the branch */ public BranchSession getBranch(long branchId) { - synchronized (this) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { List branchSessions = getBranchSessions(); for (BranchSession branchSession : branchSessions) { if (branchSession.getBranchId() == branchId) { From 9dbf1072baedf7947e392b1ba1f1388e7dfe243d Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Thu, 28 Nov 2024 19:51:21 +0800 Subject: [PATCH 04/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../exception/SkipCallbackWrapperException.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java b/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java index 55cc7b3732a..5282954324a 100644 --- a/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java +++ b/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java @@ -16,8 +16,6 @@ */ package org.apache.seata.common.exception; -import org.apache.seata.common.lock.ResourceLock; - /** * Skip Callback Wrapper Exception. * This exception class will make the semantics clearer. @@ -25,17 +23,14 @@ * @since above 1.4.2 */ public class SkipCallbackWrapperException extends RuntimeException { - private final ResourceLock resourceLock = new ResourceLock(); public SkipCallbackWrapperException(Throwable cause) { super(cause); } @Override - public Throwable fillInStackTrace() { - try (ResourceLock ignored = resourceLock.obtain()) { - // do nothing - return null; - } + public synchronized Throwable fillInStackTrace() { + // do nothing + return null; } } From 2674e1a742d75760fef6836e0d39b5857845d0ff Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Fri, 29 Nov 2024 09:10:29 +0800 Subject: [PATCH 05/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/loader/EnhancedServiceNotFoundException.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java b/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java index 887ef88e283..88bc211d5d3 100644 --- a/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java +++ b/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java @@ -17,7 +17,6 @@ package org.apache.seata.common.loader; import org.apache.commons.lang.exception.NestableRuntimeException; -import org.apache.seata.common.lock.ResourceLock; /** * The type Enhanced service not found exception. @@ -25,7 +24,6 @@ */ public class EnhancedServiceNotFoundException extends NestableRuntimeException { private static final long serialVersionUID = 7748438218914409019L; - private final ResourceLock resourceLock = new ResourceLock(); /** * Instantiates a new Enhanced service not found exception. @@ -77,9 +75,7 @@ public EnhancedServiceNotFoundException(Throwable cause) { } @Override - public Throwable fillInStackTrace() { - try (ResourceLock ignored = resourceLock.obtain()) { - return this; - } + public synchronized Throwable fillInStackTrace() { + return this; } } From a0f2e9f4f0ce4a1d60d36d9e88ac751c976af3e2 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Fri, 29 Nov 2024 09:47:57 +0800 Subject: [PATCH 06/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/apache/seata/common/util/UUIDGenerator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java b/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java index 9316c3011f7..7b8ea727599 100644 --- a/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java +++ b/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java @@ -24,7 +24,7 @@ public class UUIDGenerator { private static volatile IdWorker idWorker; - private final static ResourceLock resourceLock = new ResourceLock(); + private final static ResourceLock RESOURCE_LOCK = new ResourceLock(); /** * generate UUID using snowflake algorithm @@ -33,7 +33,7 @@ public class UUIDGenerator { */ public static long generateUUID() { if (idWorker == null) { - try (ResourceLock ignored = resourceLock.obtain()) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (idWorker == null) { init(null); } From 33d6ddd3a2e178f5a077b81094e44f7dc47ba994 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Fri, 29 Nov 2024 10:00:33 +0800 Subject: [PATCH 07/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../seata/core/rpc/netty/AbstractNettyRemoting.java | 2 +- .../seata/core/rpc/netty/AbstractNettyRemotingClient.java | 8 ++++---- .../seata/core/rpc/netty/AbstractNettyRemotingServer.java | 2 +- .../seata/core/rpc/netty/NettyClientChannelManager.java | 2 +- .../rpc/processor/server/ServerOnRequestProcessor.java | 4 ++-- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java index ea899182fa2..e9d0d171405 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java @@ -346,7 +346,7 @@ protected String getAddressFromChannel(Channel channel) { private void channelWritableCheck(Channel channel, Object msg) { int tryTimes = 0; - try (ResourceLock ignored = resourceLock.obtain()){ + try (ResourceLock ignored = resourceLock.obtain()) { while (!channel.isWritable()) { try { tryTimes++; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index c8634ef04ef..ba3f5dfcaca 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -85,7 +85,7 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L; private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend"; private final ResourceLock mergerLock = new ResourceLock(); - private final Condition mergeCondition= mergerLock.newCondition(); + private final Condition mergeCondition = mergerLock.newCondition(); /** * When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap. @@ -166,7 +166,7 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { LOGGER.debug("offer message: {}", rpcMessage.getBody()); } if (!isSending) { - try (ResourceLock ignored = mergerLock.obtain()){ + try (ResourceLock ignored = mergerLock.obtain()) { mergeCondition.notifyAll(); } } @@ -347,7 +347,7 @@ private class MergedSendRunnable implements Runnable { @Override public void run() { while (true) { - try (ResourceLock ignored = mergerLock.obtain()){ + try (ResourceLock ignored = mergerLock.obtain()) { mergeCondition.await(MAX_MERGE_SEND_MILLS, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { } @@ -432,7 +432,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) { - try (ResourceLock ignored = resourceLock.obtain()){ + try (ResourceLock ignored = resourceLock.obtain()) { if (ctx.channel().isWritable()) { condition.signalAll(); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 7399dd8049e..1d37506c852 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -178,7 +178,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) { - try (ResourceLock ignored = resourceLock.obtain()){ + try (ResourceLock ignored = resourceLock.obtain()) { if (ctx.channel().isWritable()) { condition.signalAll(); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java index 2a9f33e359f..61469a7b4e2 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java @@ -107,7 +107,7 @@ Channel acquireChannel(String serverAddress) { LOGGER.info("will connect to {}", serverAddress); } ResourceLock lock = CollectionUtils.computeIfAbsent(resourceLocks, serverAddress, key -> new ResourceLock()); - try (ResourceLock ignored = lock.obtain()){ + try (ResourceLock ignored = lock.obtain()) { return doConnect(serverAddress); } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java index 87e48d4ed95..b53d59f1c7b 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.java @@ -218,7 +218,7 @@ private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) private void notifyBatchRespondingThread() { if (!isResponding) { - try (ResourceLock ignored = batchResponseLock.obtain()){ + try (ResourceLock ignored = batchResponseLock.obtain()) { condition.signalAll(); } } @@ -245,7 +245,7 @@ private class BatchResponseRunnable implements Runnable { @Override public void run() { while (true) { - try (ResourceLock ignored = batchResponseLock.obtain()){ + try (ResourceLock ignored = batchResponseLock.obtain()) { condition.await(MAX_BATCH_RESPONSE_MILLS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.error("BatchResponseRunnable Interrupted error", e); From c9f0fe911de1dd379c7b0d47df3827e2cdadb3e3 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Mon, 9 Dec 2024 19:55:36 +0800 Subject: [PATCH 08/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized;fix=20signalAll?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../seata/core/rpc/netty/AbstractNettyRemotingClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index ba3f5dfcaca..88174970521 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -167,7 +167,7 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { } if (!isSending) { try (ResourceLock ignored = mergerLock.obtain()) { - mergeCondition.notifyAll(); + mergeCondition.signalAll(); } } From 17524b18eb7977c950bcf8a73bb28b862123a82d Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Tue, 10 Dec 2024 09:40:11 +0800 Subject: [PATCH 09/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized;fix=20test,add=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../seata/common/lock/ResourceLock.java | 7 + .../seata/common/lock/ResourceLockTest.java | 123 ++++++++++++++++++ .../netty/NettyClientChannelManagerTest.java | 7 +- 3 files changed, 134 insertions(+), 3 deletions(-) create mode 100644 common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java diff --git a/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java b/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java index 311d0208670..22e815e5784 100644 --- a/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java +++ b/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java @@ -46,6 +46,13 @@ public ResourceLock obtain() { } + /** + * Unlock the resource lock. + * + *

This is typically used in try-with-resources blocks to automatically + * unlock the resource lock when the block is exited, regardless of whether + * an exception is thrown or not. + */ @Override public void close() { this.unlock(); diff --git a/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java new file mode 100644 index 00000000000..7b5ecd893fe --- /dev/null +++ b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.common.lock; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.*; + +@ExtendWith(MockitoExtension.class) +public class ResourceLockTest { + + @Test + public void testObtainAndClose() { + ResourceLock resourceLock = new ResourceLock(); + + // Test obtaining the lock + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread"); + } + + // After try-with-resources, lock should be released + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after try-with-resources"); + } + + @Test + public void testMultipleObtainAndClose() { + ResourceLock resourceLock = new ResourceLock(); + + // First obtain and release + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread"); + } + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after first try-with-resources"); + + // Second obtain and release + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread"); + } + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after second try-with-resources"); + } + + @Test + public void testConcurrentLocking() throws InterruptedException { + ResourceLock resourceLock = new ResourceLock(); + + Thread t1 = new Thread(() -> { + try (ResourceLock lock = resourceLock.obtain()) { + try { + Thread.sleep(100); // Hold the lock for 100ms + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + + Thread t2 = new Thread(() -> { + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should not be held by current thread before t1 releases it"); + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread after t1 releases it"); + } + }); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after both threads complete"); + } + + @Test + public void testLockInterruptibly() throws InterruptedException { + ResourceLock resourceLock = new ResourceLock(); + + Thread t1 = new Thread(() -> { + try (ResourceLock lock = resourceLock.obtain()) { + try { + Thread.sleep(1000); // Hold the lock for 1000ms + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + + t1.start(); + Thread.sleep(50); // Wait for t1 to acquire the lock + + Thread t2 = new Thread(() -> { + try { + resourceLock.lockInterruptibly(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + t2.start(); + Thread.sleep(50); // Wait for t2 to attempt to acquire the lock + + t2.interrupt(); // Interrupt t2 + + t1.join(); + t2.join(); + + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after t1 completes"); + } +} diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientChannelManagerTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientChannelManagerTest.java index 02f8e990cfe..4791de0372c 100644 --- a/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientChannelManagerTest.java +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientChannelManagerTest.java @@ -18,6 +18,7 @@ import io.netty.channel.Channel; import org.apache.commons.pool.impl.GenericKeyedObjectPool; +import org.apache.seata.common.lock.ResourceLock; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -148,9 +149,9 @@ void assertReleaseChannelNotEqualToCache() throws Exception { @SuppressWarnings("unchecked") private void setUpReleaseChannel() { - ConcurrentMap channelLocks = - (ConcurrentMap) getFieldValue("channelLocks", channelManager); - channelLocks.putIfAbsent("127.0.0.1:8091", new Object()); + ConcurrentMap channelLocks = + (ConcurrentMap) getFieldValue("resourceLocks", channelManager); + channelLocks.putIfAbsent("127.0.0.1:8091", new ResourceLock()); ConcurrentMap poolKeyMap = (ConcurrentMap) getFieldValue("poolKeyMap", channelManager); poolKeyMap.putIfAbsent("127.0.0.1:8091", nettyPoolKey); From 613612c95a47eefbeb7ced806be264401236de46 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Tue, 10 Dec 2024 09:46:38 +0800 Subject: [PATCH 10/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized;changelog?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + 2 files changed, 2 insertions(+) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 1f903679dfa..41664b3d00d 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -11,6 +11,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6992](https://github.com/apache/incubator-seata/pull/6992)] support grpc serializer - [[#6973](https://github.com/apache/incubator-seata/pull/6973)] support saga annotation - [[#6926](https://github.com/apache/incubator-seata/pull/6926)] support ssl communication for raft nodes +- [[#6983](https://github.com/apache/incubator-seata/pull/6983)] Support jdk21 virtual threads, and replace synchronized with ReentrantLock ### bugfix: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 54c3dabf742..7530cd40189 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -12,6 +12,7 @@ - [[#6995](https://github.com/apache/incubator-seata/pull/6995)] 升级过时的 npmjs 依赖 - [[#6973](https://github.com/apache/incubator-seata/pull/6973)] 支持saga注解化 - [[#6926](https://github.com/apache/incubator-seata/pull/6926)] 支持Raft节点间的SSL通信 +- [[#6983](https://github.com/apache/incubator-seata/pull/6983)] 支持jdk21虚拟线程,用ReentrantLock替换synchronized ### bugfix: - [[#6899](https://github.com/apache/incubator-seata/pull/6899)] 修复file.conf打包后的读取 From 70610f4d3071f0a66c9cfce505514402ede29427 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Tue, 10 Dec 2024 10:19:15 +0800 Subject: [PATCH 11/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized;fix=20PMD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../saga/engine/pcext/utils/CompensationHolder.java | 6 +++--- .../seata/saga/engine/pcext/utils/EngineUtils.java | 6 +++--- .../saga/engine/pcext/utils/ParameterUtils.java | 12 ++++++------ 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/CompensationHolder.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/CompensationHolder.java index 7a40a807126..733f9dfbd83 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/CompensationHolder.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/CompensationHolder.java @@ -61,13 +61,13 @@ public class CompensationHolder { */ private Stack stateStackNeedCompensation = new Stack<>(); - private static final ConcurrentHashMap contextLocks = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap CONTEXT_LOCKS = new ConcurrentHashMap<>(); public static CompensationHolder getCurrent(ProcessContext context, boolean forceCreate) { CompensationHolder compensationholder = (CompensationHolder)context.getVariable( DomainConstants.VAR_NAME_CURRENT_COMPENSATION_HOLDER); if (compensationholder == null && forceCreate) { - try (ResourceLock ignored = CollectionUtils.computeIfAbsent(contextLocks, context, k -> new ResourceLock()).obtain()) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(CONTEXT_LOCKS, context, k -> new ResourceLock()).obtain()) { compensationholder = (CompensationHolder)context.getVariable( DomainConstants.VAR_NAME_CURRENT_COMPENSATION_HOLDER); if (compensationholder == null) { @@ -75,7 +75,7 @@ public static CompensationHolder getCurrent(ProcessContext context, boolean forc context.setVariable(DomainConstants.VAR_NAME_CURRENT_COMPENSATION_HOLDER, compensationholder); } } finally { - contextLocks.remove(context); + CONTEXT_LOCKS.remove(context); } } return compensationholder; diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/EngineUtils.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/EngineUtils.java index e44f18dd8bc..741b1d0f3f8 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/EngineUtils.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/EngineUtils.java @@ -46,7 +46,7 @@ public class EngineUtils { private static final Logger LOGGER = LoggerFactory.getLogger(EngineUtils.class); - private static final ConcurrentHashMap exceptionMatchLocks = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap EXCEPTION_MATCH_LOCKS = new ConcurrentHashMap<>(); /** * generate parent id @@ -201,7 +201,7 @@ public static void handleException(ProcessContext context, AbstractTaskState sta List> exceptionClasses = exceptionMatch.getExceptionClasses(); if (CollectionUtils.isNotEmpty(exceptions)) { if (exceptionClasses == null) { - try (ResourceLock ignored = CollectionUtils.computeIfAbsent(exceptionMatchLocks, exceptionMatch, k -> new ResourceLock()).obtain()) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(EXCEPTION_MATCH_LOCKS, exceptionMatch, k -> new ResourceLock()).obtain()) { exceptionClasses = exceptionMatch.getExceptionClasses(); if (exceptionClasses == null) { @@ -234,7 +234,7 @@ public static void handleException(ProcessContext context, AbstractTaskState sta exceptionMatch.setExceptionClasses(exceptionClasses); } } finally { - exceptionMatchLocks.remove(exceptionMatch); + EXCEPTION_MATCH_LOCKS.remove(exceptionMatch); } } diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/ParameterUtils.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/ParameterUtils.java index 1dbbc55462b..75f75d1d4a3 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/ParameterUtils.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/ParameterUtils.java @@ -38,8 +38,8 @@ * */ public class ParameterUtils { - private static final ConcurrentHashMap inTaskStateLocks = new ConcurrentHashMap<>(); - private static final ConcurrentHashMap outTaskStateLocks = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap IN_TASK_STATE_LOCKS = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap OUT_TASK_STATE_LOCKS = new ConcurrentHashMap<>(); public static List createInputParams(ExpressionResolver expressionResolver, StateInstanceImpl stateInstance, @@ -51,7 +51,7 @@ public static List createInputParams(ExpressionResolver expressionResolv List inputExpressions = serviceTaskState.getInputExpressions(); if (inputExpressions == null) { - try (ResourceLock ignored = CollectionUtils.computeIfAbsent(inTaskStateLocks, serviceTaskState, k -> new ResourceLock()).obtain()) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(IN_TASK_STATE_LOCKS, serviceTaskState, k -> new ResourceLock()).obtain()) { inputExpressions = serviceTaskState.getInputExpressions(); if (inputExpressions == null) { inputExpressions = new ArrayList<>(inputAssignments.size()); @@ -61,7 +61,7 @@ public static List createInputParams(ExpressionResolver expressionResolv } serviceTaskState.setInputExpressions(inputExpressions); } finally { - inTaskStateLocks.remove(serviceTaskState); + IN_TASK_STATE_LOCKS.remove(serviceTaskState); } } List inputValues = new ArrayList<>(inputExpressions.size()); @@ -82,7 +82,7 @@ public static Map createOutputParams(ExpressionResolver expressi Map outputExpressions = serviceTaskState.getOutputExpressions(); if (outputExpressions == null) { - try (ResourceLock ignored = CollectionUtils.computeIfAbsent(outTaskStateLocks, serviceTaskState, k -> new ResourceLock()).obtain()) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(OUT_TASK_STATE_LOCKS, serviceTaskState, k -> new ResourceLock()).obtain()) { outputExpressions = serviceTaskState.getOutputExpressions(); if (outputExpressions == null) { outputExpressions = new LinkedHashMap<>(outputAssignments.size()); @@ -93,7 +93,7 @@ public static Map createOutputParams(ExpressionResolver expressi } serviceTaskState.setOutputExpressions(outputExpressions); } finally { - outTaskStateLocks.remove(serviceTaskState); + OUT_TASK_STATE_LOCKS.remove(serviceTaskState); } } Map outputValues = new LinkedHashMap<>(outputExpressions.size()); From d4ed7cb37c87629f618b4f645e5c6d08fe1b2a30 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Tue, 10 Dec 2024 10:38:45 +0800 Subject: [PATCH 12/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized;fix=20WhitespaceAround?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../seata/integration/tx/api/fence/hook/TccHookManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java index 33a1ec66d0b..efec686cc80 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java @@ -42,7 +42,7 @@ private TccHookManager() { */ public static List getHooks() { if (CACHED_UNMODIFIABLE_HOOKS == null) { - try (ResourceLock ignored = LOCK.obtain()){ + try (ResourceLock ignored = LOCK.obtain()) { if (CACHED_UNMODIFIABLE_HOOKS == null) { CACHED_UNMODIFIABLE_HOOKS = Collections.unmodifiableList(TCC_HOOKS); } From 92fe155fd2e339434ad423a617de9aa2996d3828 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Tue, 10 Dec 2024 10:54:44 +0800 Subject: [PATCH 13/14] =?UTF-8?q?feature:=E6=96=B0=E5=A2=9EResourceLock?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2synchronized;fix=20PMD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/seata/server/session/GlobalSession.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java index 544e0fe72c2..82ae755ed4b 100644 --- a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java +++ b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java @@ -108,7 +108,7 @@ public class GlobalSession implements SessionLifecycle, SessionStorable { private Set lifecycleListeners = new HashSet<>(2); - private final ResourceLock RESOURCE_LOCK = new ResourceLock(); + private final ResourceLock resourceLock = new ResourceLock(); /** * Add boolean. @@ -132,7 +132,7 @@ public boolean add(BranchSession branchSession) { * @return the boolean */ public boolean remove(BranchSession branchSession) { - try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + try (ResourceLock ignored = resourceLock.obtain()) { return branchSessions.remove(branchSession); } } @@ -331,7 +331,7 @@ public void addBranch(BranchSession branchSession) throws TransactionException { public void loadBranchs() { if (branchSessions == null && isLazyLoadBranch()) { - try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + try (ResourceLock ignored = resourceLock.obtain()) { if (branchSessions == null && isLazyLoadBranch()) { branchSessions = new ArrayList<>(); Optional.ofNullable(SessionHolder.getRootSessionManager().findGlobalSession(xid, true)) @@ -379,7 +379,7 @@ public void removeAndUnlockBranch(BranchSession branchSession) throws Transactio * @return the branch */ public BranchSession getBranch(long branchId) { - try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + try (ResourceLock ignored = resourceLock.obtain()) { List branchSessions = getBranchSessions(); for (BranchSession branchSession : branchSessions) { if (branchSession.getBranchId() == branchId) { From 9ce065281ff3e6e62715a4ff7abc5affa06c5183 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Wed, 11 Dec 2024 11:34:39 +0800 Subject: [PATCH 14/14] feature:add test --- .../seata/common/lock/ResourceLockTest.java | 23 +++++++++++++++++++ .../engine/pcext/utils/LoopContextHolder.java | 3 +-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java index 7b5ecd893fe..1ceee9ca8cb 100644 --- a/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java +++ b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java @@ -16,10 +16,13 @@ */ package org.apache.seata.common.lock; +import org.apache.seata.common.util.CollectionUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; +import java.util.concurrent.ConcurrentHashMap; + import static org.junit.jupiter.api.Assertions.*; @ExtendWith(MockitoExtension.class) @@ -55,6 +58,26 @@ public void testMultipleObtainAndClose() { assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after second try-with-resources"); } + @Test + public void testResourceLockAutoRemovalFromMap() { + ConcurrentHashMap lockMap = new ConcurrentHashMap<>(); + String key = "testKey"; + // Use try-with-resources to obtain and release the lock + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(lockMap, key, k -> new ResourceLock()).obtain()) { + // Do something while holding the lock + assertTrue(lockMap.containsKey(key)); + assertTrue(lockMap.get(key).isHeldByCurrentThread()); + } finally { + assertFalse(lockMap.get(key).isHeldByCurrentThread()); + assertTrue(lockMap.containsKey(key)); + // Remove the lock from the map + lockMap.remove(key); + assertFalse(lockMap.containsKey(key)); + } + // Ensure the lock is removed from the map + assertFalse(lockMap.containsKey(key)); + } + @Test public void testConcurrentLocking() throws InterruptedException { ResourceLock resourceLock = new ResourceLock(); diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopContextHolder.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopContextHolder.java index 39d9a54bbf3..830d0212de2 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopContextHolder.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/utils/LoopContextHolder.java @@ -47,8 +47,7 @@ public static LoopContextHolder getCurrent(ProcessContext context, boolean force DomainConstants.VAR_NAME_CURRENT_LOOP_CONTEXT_HOLDER); if (null == loopContextHolder && forceCreate) { - try (ResourceLock lock = CollectionUtils.computeIfAbsent(CONTEXT_LOCK_MAP, context, k -> new ResourceLock()); - ResourceLock ignored = lock.obtain()) { + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(CONTEXT_LOCK_MAP, context, k -> new ResourceLock()).obtain()) { loopContextHolder = (LoopContextHolder)context.getVariable( DomainConstants.VAR_NAME_CURRENT_LOOP_CONTEXT_HOLDER); if (null == loopContextHolder) {