diff --git a/sentinel-cluster/sentinel-cluster-client-default/pom.xml b/sentinel-cluster/sentinel-cluster-client-default/pom.xml
index 3c61b2c658..7410143164 100644
--- a/sentinel-cluster/sentinel-cluster-client-default/pom.xml
+++ b/sentinel-cluster/sentinel-cluster-client-default/pom.xml
@@ -1,62 +1,62 @@
-
-
-
- sentinel-cluster
- com.alibaba.csp
- 1.8.0-SNAPSHOT
-
- 4.0.0
-
- sentinel-cluster-client-default
- jar
-
-
-
- com.alibaba.csp
- sentinel-core
-
-
- com.alibaba.csp
- sentinel-transport-common
- provided
-
-
- com.alibaba.csp
- sentinel-cluster-common-default
-
-
-
- io.netty
- netty-handler
-
-
-
- com.alibaba.csp
- sentinel-datasource-nacos
- test
-
-
- com.alibaba.csp
- sentinel-parameter-flow-control
- test
-
-
-
- junit
- junit
- test
-
-
- org.mockito
- mockito-core
- test
-
-
- org.assertj
- assertj-core
- test
-
-
+
+
+
+ sentinel-cluster
+ com.alibaba.csp
+ 1.8.0-SNAPSHOT
+
+ 4.0.0
+
+ sentinel-cluster-client-default
+ jar
+
+
+
+ com.alibaba.csp
+ sentinel-core
+
+
+ com.alibaba.csp
+ sentinel-transport-common
+ provided
+
+
+ com.alibaba.csp
+ sentinel-cluster-common-default
+
+
+
+ io.netty
+ netty-handler
+
+
+
+ com.alibaba.csp
+ sentinel-datasource-nacos
+ test
+
+
+ com.alibaba.csp
+ sentinel-parameter-flow-control
+ test
+
+
+
+ junit
+ junit
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
\ No newline at end of file
diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/ClientConstants.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/ClientConstants.java
index 41b1dd0121..fc3006c58c 100644
--- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/ClientConstants.java
+++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/ClientConstants.java
@@ -24,6 +24,8 @@ public final class ClientConstants {
public static final int TYPE_PING = 0;
public static final int TYPE_FLOW = 1;
public static final int TYPE_PARAM_FLOW = 2;
+ public static final int TYPE_CONCURRENT_FLOW_ACQUIRE = 3;
+ public static final int TYPE_CONCURRENT_FLOW_RELEASE = 4;
public static final int CLIENT_STATUS_OFF = 0;
public static final int CLIENT_STATUS_PENDING = 1;
diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java
index ecc727a5b2..b517877e56 100644
--- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java
+++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java
@@ -15,27 +15,26 @@
*/
package com.alibaba.csp.sentinel.cluster.client;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.alibaba.csp.sentinel.cluster.ClusterConstants;
-import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
-import com.alibaba.csp.sentinel.cluster.ClusterTransportClient;
-import com.alibaba.csp.sentinel.cluster.TokenResult;
-import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
-import com.alibaba.csp.sentinel.cluster.TokenServerDescriptor;
+import com.alibaba.csp.sentinel.cluster.*;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientAssignConfig;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager;
import com.alibaba.csp.sentinel.cluster.client.config.ServerChangeObserver;
import com.alibaba.csp.sentinel.cluster.log.ClusterClientStatLogUtil;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
+import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowAcquireRequestData;
+import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowReleaseRequestData;
import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData;
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
+import com.alibaba.csp.sentinel.cluster.response.data.ConcurrentFlowAcquireResponseData;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.StringUtil;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* Default implementation of {@link ClusterTokenClient}.
*
@@ -152,7 +151,7 @@ public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritiz
return badRequest();
}
FlowRequestData data = new FlowRequestData().setCount(acquireCount)
- .setFlowId(flowId).setPriority(prioritized);
+ .setFlowId(flowId).setPriority(prioritized);
ClusterRequest request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
try {
TokenResult result = sendTokenRequest(request);
@@ -170,7 +169,7 @@ public TokenResult requestParamToken(Long flowId, int acquireCount, Collection request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_PARAM_FLOW, data);
try {
TokenResult result = sendTokenRequest(request);
@@ -182,6 +181,40 @@ public TokenResult requestParamToken(Long flowId, int acquireCount, Collection request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_ACQUIRE, data);
+ try {
+ TokenResult result = sendTokenRequest(request);
+ logForResult(result);
+ return result;
+ } catch (Exception ex) {
+ ClusterClientStatLogUtil.log(ex.getMessage());
+ return new TokenResult(TokenResultStatus.FAIL);
+ }
+ }
+
+ @Override
+ public TokenResult releaseConcurrentToken(Long tokenId) {
+ if (tokenId == null) {
+ return badRequest();
+ }
+ ConcurrentFlowReleaseRequestData data = new ConcurrentFlowReleaseRequestData().setTokenId(tokenId);
+ ClusterRequest request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_RELEASE, data);
+ try {
+ TokenResult result = sendTokenRequest(request);
+ logForResult(result);
+ return result;
+ } catch (Exception ex) {
+ ClusterClientStatLogUtil.log(ex.getMessage());
+ return new TokenResult(TokenResultStatus.FAIL);
+ }
+ }
+
private void logForResult(TokenResult result) {
switch (result.getStatus()) {
case TokenResultStatus.NO_RULE_EXISTS:
@@ -197,15 +230,29 @@ private void logForResult(TokenResult result) {
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
if (transportClient == null) {
RecordLog.warn(
- "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
+ "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
return clientFail();
}
ClusterResponse response = transportClient.sendRequest(request);
TokenResult result = new TokenResult(response.getStatus());
if (response.getData() != null) {
- FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
- result.setRemaining(responseData.getRemainingCount())
- .setWaitInMs(responseData.getWaitInMs());
+ switch (request.getType()) {
+ case ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_RELEASE:
+ break;
+ case ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_ACQUIRE:
+ ConcurrentFlowAcquireResponseData concurrentAcquireResponseData = (ConcurrentFlowAcquireResponseData) response.getData();
+ result.setTokenId(concurrentAcquireResponseData.getTokenId());
+ break;
+ case ClusterConstants.MSG_TYPE_PARAM_FLOW:
+ case ClusterConstants.MSG_TYPE_FLOW:
+ FlowTokenResponseData responseData = (FlowTokenResponseData) response.getData();
+ result.setRemaining(responseData.getRemainingCount())
+ .setWaitInMs(responseData.getWaitInMs());
+ break;
+ default:
+ RecordLog.warn(
+ "[DefaultClusterTokenClient] Unknown request type: {}", request.getType());
+ }
}
return result;
}
diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java
index e2038b500a..bd3a3dda51 100644
--- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java
+++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java
@@ -1,270 +1,272 @@
-/*
- * Copyright 1999-2018 Alibaba Group Holding Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.csp.sentinel.cluster.client;
-
-import java.util.AbstractMap.SimpleEntry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
-import com.alibaba.csp.sentinel.cluster.ClusterTransportClient;
-import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyRequestEncoder;
-import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyResponseDecoder;
-import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager;
-import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientHandler;
-import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientPromiseHolder;
-import com.alibaba.csp.sentinel.cluster.exception.SentinelClusterException;
-import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
-import com.alibaba.csp.sentinel.cluster.request.Request;
-import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
-import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
-import com.alibaba.csp.sentinel.log.RecordLog;
-import com.alibaba.csp.sentinel.util.AssertUtil;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
-import io.netty.util.concurrent.GenericFutureListener;
-
-/**
- * Netty transport client implementation for Sentinel cluster transport.
- *
- * @author Eric Zhao
- * @since 1.4.0
- */
-public class NettyTransportClient implements ClusterTransportClient {
-
- @SuppressWarnings("PMD.ThreadPoolCreationRule")
- private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
- new NamedThreadFactory("sentinel-cluster-transport-client-scheduler"));
-
- public static final int RECONNECT_DELAY_MS = 2000;
-
- private final String host;
- private final int port;
-
- private Channel channel;
- private NioEventLoopGroup eventLoopGroup;
- private TokenClientHandler clientHandler;
-
- private final AtomicInteger idGenerator = new AtomicInteger(0);
- private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF);
- private final AtomicInteger failConnectedTime = new AtomicInteger(0);
-
- private final AtomicBoolean shouldRetry = new AtomicBoolean(true);
-
- public NettyTransportClient(String host, int port) {
- AssertUtil.assertNotBlank(host, "remote host cannot be blank");
- AssertUtil.isTrue(port > 0, "port should be positive");
- this.host = host;
- this.port = port;
- }
-
- private Bootstrap initClientBootstrap() {
- Bootstrap b = new Bootstrap();
- eventLoopGroup = new NioEventLoopGroup();
- b.group(eventLoopGroup)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
- .handler(new ChannelInitializer() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- clientHandler = new TokenClientHandler(currentState, disconnectCallback);
-
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
- pipeline.addLast(new NettyResponseDecoder());
- pipeline.addLast(new LengthFieldPrepender(2));
- pipeline.addLast(new NettyRequestEncoder());
- pipeline.addLast(clientHandler);
- }
- });
-
- return b;
- }
-
- private void connect(Bootstrap b) {
- if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) {
- b.connect(host, port)
- .addListener(new GenericFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- if (future.cause() != null) {
- RecordLog.warn(
- String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times",
- host, port, failConnectedTime.get()), future.cause());
- failConnectedTime.incrementAndGet();
- channel = null;
- } else {
- failConnectedTime.set(0);
- channel = future.channel();
- RecordLog.info(
- "[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">");
- }
- }
- });
- }
- }
-
- private Runnable disconnectCallback = new Runnable() {
- @Override
- public void run() {
- if (!shouldRetry.get()) {
- return;
- }
- SCHEDULER.schedule(new Runnable() {
- @Override
- public void run() {
- if (shouldRetry.get()) {
- RecordLog.info("[NettyTransportClient] Reconnecting to server <" + host + ":" + port + ">");
- try {
- startInternal();
- } catch (Exception e) {
- RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e);
- }
- }
- }
- }, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS);
- cleanUp();
- }
- };
-
- @Override
- public void start() throws Exception {
- shouldRetry.set(true);
- startInternal();
- }
-
- private void startInternal() {
- connect(initClientBootstrap());
- }
-
- private void cleanUp() {
- if (channel != null) {
- channel.close();
- channel = null;
- }
- if (eventLoopGroup != null) {
- eventLoopGroup.shutdownGracefully();
- }
- }
-
- @Override
- public void stop() throws Exception {
- // Stop retrying for connection.
- shouldRetry.set(false);
-
- while (currentState.get() == ClientConstants.CLIENT_STATUS_PENDING) {
- try {
- Thread.sleep(200);
- } catch (Exception ex) {
- // Ignore.
- }
- }
-
- cleanUp();
- failConnectedTime.set(0);
-
- RecordLog.info("[NettyTransportClient] Cluster transport client stopped");
- }
-
- private boolean validRequest(Request request) {
- return request != null && request.getType() >= 0;
- }
-
- @Override
- public boolean isReady() {
- return channel != null && clientHandler != null && clientHandler.hasStarted();
- }
-
- @Override
- public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
- if (!isReady()) {
- throw new SentinelClusterException(ClusterErrorMessages.CLIENT_NOT_READY);
- }
- if (!validRequest(request)) {
- throw new SentinelClusterException(ClusterErrorMessages.BAD_REQUEST);
- }
- int xid = getCurrentId();
- try {
- request.setId(xid);
-
- channel.writeAndFlush(request);
-
- ChannelPromise promise = channel.newPromise();
- TokenClientPromiseHolder.putPromise(xid, promise);
-
- if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) {
- throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
- }
-
- SimpleEntry entry = TokenClientPromiseHolder.getEntry(xid);
- if (entry == null || entry.getValue() == null) {
- // Should not go through here.
- throw new SentinelClusterException(ClusterErrorMessages.UNEXPECTED_STATUS);
- }
- return entry.getValue();
- } finally {
- TokenClientPromiseHolder.remove(xid);
- }
- }
-
- private int getCurrentId() {
- if (idGenerator.get() > MAX_ID) {
- idGenerator.set(0);
- }
- return idGenerator.incrementAndGet();
- }
-
- /*public CompletableFuture sendRequestAsync(ClusterRequest request) {
- // Uncomment this when min target JDK is 1.8.
- if (!validRequest(request)) {
- return CompletableFuture.failedFuture(new IllegalArgumentException("Bad request"));
- }
- int xid = getCurrentId();
- request.setId(xid);
-
- CompletableFuture future = new CompletableFuture<>();
- channel.writeAndFlush(request)
- .addListener(f -> {
- if (f.isSuccess()) {
- future.complete(someResult);
- } else if (f.cause() != null) {
- future.completeExceptionally(f.cause());
- } else {
- future.cancel(false);
- }
- });
- return future;
- }*/
-
- private static final int MAX_ID = 999_999_999;
-}
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.client;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
+import com.alibaba.csp.sentinel.cluster.ClusterTransportClient;
+import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyRequestEncoder;
+import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyResponseDecoder;
+import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager;
+import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientHandler;
+import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientPromiseHolder;
+import com.alibaba.csp.sentinel.cluster.exception.SentinelClusterException;
+import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
+import com.alibaba.csp.sentinel.cluster.request.Request;
+import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
+import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
+import com.alibaba.csp.sentinel.log.RecordLog;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.util.concurrent.GenericFutureListener;
+
+/**
+ * Netty transport client implementation for Sentinel cluster transport.
+ *
+ * @author Eric Zhao
+ * @since 1.4.0
+ */
+public class NettyTransportClient implements ClusterTransportClient {
+
+ @SuppressWarnings("PMD.ThreadPoolCreationRule")
+ private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
+ new NamedThreadFactory("sentinel-cluster-transport-client-scheduler"));
+
+ public static final int RECONNECT_DELAY_MS = 2000;
+
+ private final String host;
+ private final int port;
+
+ private Channel channel;
+ private NioEventLoopGroup eventLoopGroup;
+ private TokenClientHandler clientHandler;
+
+ private final AtomicInteger idGenerator = new AtomicInteger(0);
+ private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF);
+ private final AtomicInteger failConnectedTime = new AtomicInteger(0);
+
+ private final AtomicBoolean shouldRetry = new AtomicBoolean(true);
+
+ public NettyTransportClient(String host, int port) {
+ AssertUtil.assertNotBlank(host, "remote host cannot be blank");
+ AssertUtil.isTrue(port > 0, "port should be positive");
+ this.host = host;
+ this.port = port;
+ }
+
+ private Bootstrap initClientBootstrap() {
+ Bootstrap b = new Bootstrap();
+ eventLoopGroup = new NioEventLoopGroup();
+ b.group(eventLoopGroup)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
+ .handler(new ChannelInitializer() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ clientHandler = new TokenClientHandler(currentState, disconnectCallback);
+
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
+ pipeline.addLast(new NettyResponseDecoder());
+ pipeline.addLast(new LengthFieldPrepender(2));
+ pipeline.addLast(new NettyRequestEncoder());
+ pipeline.addLast(clientHandler);
+ }
+ });
+
+ return b;
+ }
+
+ private void connect(Bootstrap b) {
+ if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) {
+ b.connect(host, port)
+ .addListener(new GenericFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.cause() != null) {
+ RecordLog.warn(
+ String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times",
+ host, port, failConnectedTime.get()), future.cause());
+ failConnectedTime.incrementAndGet();
+ channel = null;
+ } else {
+ failConnectedTime.set(0);
+ channel = future.channel();
+ RecordLog.info(
+ "[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">");
+ }
+ }
+ });
+ }
+ }
+
+ private Runnable disconnectCallback = new Runnable() {
+ @Override
+ public void run() {
+ if (!shouldRetry.get()) {
+ return;
+ }
+ SCHEDULER.schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (shouldRetry.get()) {
+ RecordLog.info("[NettyTransportClient] Reconnecting to server <" + host + ":" + port + ">");
+ try {
+ startInternal();
+ } catch (Exception e) {
+ RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e);
+ }
+ }
+ }
+ }, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS);
+ cleanUp();
+ }
+ };
+
+ @Override
+ public void start() throws Exception {
+ shouldRetry.set(true);
+ startInternal();
+ }
+
+ private void startInternal() {
+ connect(initClientBootstrap());
+ }
+
+ private void cleanUp() {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+ if (eventLoopGroup != null) {
+ eventLoopGroup.shutdownGracefully();
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // Stop retrying for connection.
+ shouldRetry.set(false);
+
+ while (currentState.get() == ClientConstants.CLIENT_STATUS_PENDING) {
+ try {
+ Thread.sleep(200);
+ } catch (Exception ex) {
+ // Ignore.
+ }
+ }
+
+ cleanUp();
+ failConnectedTime.set(0);
+
+ RecordLog.info("[NettyTransportClient] Cluster transport client stopped");
+ }
+
+ private boolean validRequest(Request request) {
+ return request != null && request.getType() >= 0;
+ }
+
+ @Override
+ public boolean isReady() {
+ return channel != null && clientHandler != null && clientHandler.hasStarted();
+ }
+
+ @Override
+ public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
+ if (!isReady()) {
+ throw new SentinelClusterException(ClusterErrorMessages.CLIENT_NOT_READY);
+ }
+ if (!validRequest(request)) {
+ throw new SentinelClusterException(ClusterErrorMessages.BAD_REQUEST);
+ }
+ int xid = getCurrentId();
+ try {
+ request.setId(xid);
+
+ channel.writeAndFlush(request);
+
+ ChannelPromise promise = channel.newPromise();
+ TokenClientPromiseHolder.putPromise(xid, promise);
+
+ if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) {
+ throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
+ }
+
+ SimpleEntry entry = TokenClientPromiseHolder.getEntry(xid);
+ if (entry == null || entry.getValue() == null) {
+ // Should not go through here.
+ throw new SentinelClusterException(ClusterErrorMessages.UNEXPECTED_STATUS);
+ }
+ return entry.getValue();
+ } finally {
+ TokenClientPromiseHolder.remove(xid);
+ }
+ }
+
+ private int getCurrentId() {
+ if (idGenerator.get() > MAX_ID) {
+ idGenerator.set(0);
+ }
+ return idGenerator.incrementAndGet();
+ }
+
+
+// public CompletableFuture sendRequestAsync(ClusterRequest request) throws Exception{
+// // Uncomment this when min target JDK is 1.8.
+// if (!validRequest(request)) {
+// throw new SentinelClusterException(ClusterErrorMessages.BAD_REQUEST);
+// }
+// int xid = getCurrentId();
+// request.setId(xid);
+//
+// CompletableFuture future = new CompletableFuture<>();
+// channel.writeAndFlush(request)
+// .addListener(f -> {
+// if (f.isSuccess()) {
+// future.complete(new ClusterResponse().setStatus(1));
+// } else if (f.cause() != null) {
+// future.completeExceptionally(f.cause());
+// } else {
+// future.cancel(false);
+// }
+// });
+// return future;
+// }
+
+ private static final int MAX_ID = 999_999_999;
+}
diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowAcquireRequestDataWriter.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowAcquireRequestDataWriter.java
new file mode 100644
index 0000000000..c04708aff1
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowAcquireRequestDataWriter.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.client.codec.data;
+
+import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
+import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowAcquireRequestData;
+import io.netty.buffer.ByteBuf;
+
+/**
+ * +-------------------+--------------+----------------+---------------+
+ * | RequestID(8 byte) | Type(1 byte) | FlowID(8 byte) | Count(4 byte) |
+ * +-------------------+--------------+----------------+---------------+
+ *
+ * @author yunfeiyanggzq
+ * @Date 2020/7/9 10:01
+ */
+public class ConcurrentFlowAcquireRequestDataWriter implements EntityWriter {
+ @Override
+ public void writeTo(ConcurrentFlowAcquireRequestData entity, ByteBuf target) {
+ target.writeLong(entity.getFlowId());
+ target.writeInt(entity.getCount());
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowAcquireResponseDataDecoder.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowAcquireResponseDataDecoder.java
new file mode 100644
index 0000000000..abbe10ee61
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowAcquireResponseDataDecoder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.client.codec.data;
+
+import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
+import com.alibaba.csp.sentinel.cluster.response.data.ConcurrentFlowAcquireResponseData;
+import io.netty.buffer.ByteBuf;
+
+/**
+ * @author yunfeiyanggzq
+ * @Date 2020/7/9 10:02
+ */
+public class ConcurrentFlowAcquireResponseDataDecoder implements EntityDecoder {
+
+ @Override
+ public ConcurrentFlowAcquireResponseData decode(ByteBuf source) {
+ ConcurrentFlowAcquireResponseData data = new ConcurrentFlowAcquireResponseData();
+ data.setTokenId(source.readLong());
+ return data;
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowReleaseRequestDataWriter.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowReleaseRequestDataWriter.java
new file mode 100644
index 0000000000..287fcb01a8
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowReleaseRequestDataWriter.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.client.codec.data;
+
+import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
+import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowAcquireRequestData;
+import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowReleaseRequestData;
+import io.netty.buffer.ByteBuf;
+
+/**
+ * +-------------------+--------------+------------------+
+ * | RequestID(8 byte) | Type(1 byte) | TokenID(8 byte) |
+ * +-------------------+--------------+-----------------+
+ *
+ * @author yunfeiyanggzq
+ * @Date 2020/7/9 11:52
+ */
+public class ConcurrentFlowReleaseRequestDataWriter implements EntityWriter {
+ @Override
+ public void writeTo(ConcurrentFlowReleaseRequestData entity, ByteBuf target) {
+ target.writeLong(entity.getTokenId());
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowReleaseResponseDataDecoder.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowReleaseResponseDataDecoder.java
new file mode 100644
index 0000000000..d9a526027b
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ConcurrentFlowReleaseResponseDataDecoder.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.client.codec.data;
+
+import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
+import com.alibaba.csp.sentinel.cluster.response.data.ConcurrentFlowReleaseResponseData;
+import io.netty.buffer.ByteBuf;
+
+/**
+ * @author yunfeiyanggzq
+ * @Date 2020/7/9 11:53
+ */
+public class ConcurrentFlowReleaseResponseDataDecoder implements EntityDecoder {
+ @Override
+ public ConcurrentFlowReleaseResponseData decode(ByteBuf source) {
+ return new ConcurrentFlowReleaseResponseData();
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/FlowRequestDataWriter.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/FlowRequestDataWriter.java
index 7c8acc81f7..a134d279b1 100644
--- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/FlowRequestDataWriter.java
+++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/FlowRequestDataWriter.java
@@ -22,7 +22,7 @@
/**
* +-------------------+--------------+----------------+---------------+------------------+
- * | RequestID(8 byte) | Type(1 byte) | FlowID(8 byte) | Count(4 byte) | PriorityFlag (1) |
+ * | RequestID(8 byte) | Type(1 byte) | FlowID(4 byte) | Count(4 byte) | PriorityFlag (1) |
* +-------------------+--------------+----------------+---------------+------------------+
*
* @author Eric Zhao
diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/init/DefaultClusterClientInitFunc.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/init/DefaultClusterClientInitFunc.java
index 5b9467c7d5..65dfff4194 100644
--- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/init/DefaultClusterClientInitFunc.java
+++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/init/DefaultClusterClientInitFunc.java
@@ -16,11 +16,7 @@
package com.alibaba.csp.sentinel.cluster.client.init;
import com.alibaba.csp.sentinel.cluster.client.ClientConstants;
-import com.alibaba.csp.sentinel.cluster.client.codec.data.FlowRequestDataWriter;
-import com.alibaba.csp.sentinel.cluster.client.codec.data.FlowResponseDataDecoder;
-import com.alibaba.csp.sentinel.cluster.client.codec.data.ParamFlowRequestDataWriter;
-import com.alibaba.csp.sentinel.cluster.client.codec.data.PingRequestDataWriter;
-import com.alibaba.csp.sentinel.cluster.client.codec.data.PingResponseDataDecoder;
+import com.alibaba.csp.sentinel.cluster.client.codec.data.*;
import com.alibaba.csp.sentinel.cluster.client.codec.registry.RequestDataWriterRegistry;
import com.alibaba.csp.sentinel.cluster.client.codec.registry.ResponseDataDecodeRegistry;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientStartUpConfig;
@@ -43,6 +39,9 @@ public void init() throws Exception {
private void initDefaultEntityWriters() {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PING, new PingRequestDataWriter());
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_FLOW, new FlowRequestDataWriter());
+ RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_CONCURRENT_FLOW_ACQUIRE, new ConcurrentFlowAcquireRequestDataWriter());
+ RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_CONCURRENT_FLOW_RELEASE, new ConcurrentFlowReleaseRequestDataWriter());
+
Integer maxParamByteSize = ClusterClientStartUpConfig.getMaxParamByteSize();
if (maxParamByteSize == null) {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter());
@@ -55,5 +54,8 @@ private void initDefaultEntityDecoders() {
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PING, new PingResponseDataDecoder());
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_FLOW, new FlowResponseDataDecoder());
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PARAM_FLOW, new FlowResponseDataDecoder());
+ ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_CONCURRENT_FLOW_ACQUIRE, new ConcurrentFlowAcquireResponseDataDecoder());
+ ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_CONCURRENT_FLOW_RELEASE, new ConcurrentFlowReleaseResponseDataDecoder());
+
}
}
diff --git a/sentinel-cluster/sentinel-cluster-common-default/pom.xml b/sentinel-cluster/sentinel-cluster-common-default/pom.xml
index a4c3ae1ef5..70f3d8318a 100644
--- a/sentinel-cluster/sentinel-cluster-common-default/pom.xml
+++ b/sentinel-cluster/sentinel-cluster-common-default/pom.xml
@@ -1,22 +1,34 @@
-
-
-
- sentinel-cluster
- com.alibaba.csp
- 1.8.0-SNAPSHOT
-
- 4.0.0
-
- sentinel-cluster-common-default
- jar
-
-
-
- com.alibaba.csp
- sentinel-core
-
-
-
+
+
+
+ sentinel-cluster
+ com.alibaba.csp
+ 1.8.0-SNAPSHOT
+
+ 4.0.0
+
+ sentinel-cluster-common-default
+ jar
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 8
+ 8
+
+
+
+
+
+
+
+ com.alibaba.csp
+ sentinel-core
+
+
+
\ No newline at end of file
diff --git a/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterConstants.java b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterConstants.java
index c523413dd6..0930fda9bf 100644
--- a/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterConstants.java
+++ b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterConstants.java
@@ -24,6 +24,9 @@ public final class ClusterConstants {
public static final int MSG_TYPE_PING = 0;
public static final int MSG_TYPE_FLOW = 1;
public static final int MSG_TYPE_PARAM_FLOW = 2;
+ public static final int MSG_TYPE_CONCURRENT_FLOW_ACQUIRE = 3;
+ public static final int MSG_TYPE_CONCURRENT_FLOW_RELEASE = 4;
+
public static final int RESPONSE_STATUS_BAD = -1;
public static final int RESPONSE_STATUS_OK = 0;
diff --git a/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterTransportClient.java b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterTransportClient.java
index 23ba90146b..a4f9d0eb76 100644
--- a/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterTransportClient.java
+++ b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterTransportClient.java
@@ -1,58 +1,59 @@
-/*
- * Copyright 1999-2018 Alibaba Group Holding Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.csp.sentinel.cluster;
-
-import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
-import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
-
-/**
- * Synchronous transport client for distributed flow control.
- *
- * @author Eric Zhao
- * @since 1.4.0
- */
-public interface ClusterTransportClient {
-
- /**
- * Start the client.
- *
- * @throws Exception some error occurred (e.g. initialization failed)
- */
- void start() throws Exception;
-
- /**
- * Stop the client.
- *
- * @throws Exception some error occurred (e.g. shutdown failed)
- */
- void stop() throws Exception;
-
- /**
- * Send request to remote server and get response.
- *
- * @param request Sentinel cluster request
- * @return response from remote server
- * @throws Exception some error occurs
- */
- ClusterResponse sendRequest(ClusterRequest request) throws Exception;
-
- /**
- * Check whether the client has been started and ready for sending requests.
- *
- * @return true if the client is ready to send requests, otherwise false
- */
- boolean isReady();
-}
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster;
+
+import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
+import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Synchronous transport client for distributed flow control.
+ *
+ * @author Eric Zhao
+ * @since 1.4.0
+ */
+public interface ClusterTransportClient {
+
+ /**
+ * Start the client.
+ *
+ * @throws Exception some error occurred (e.g. initialization failed)
+ */
+ void start() throws Exception;
+
+ /**
+ * Stop the client.
+ *
+ * @throws Exception some error occurred (e.g. shutdown failed)
+ */
+ void stop() throws Exception;
+
+ /**
+ * Send request to remote server and get response.
+ *
+ * @param request Sentinel cluster request
+ * @return response from remote server
+ * @throws Exception some error occurs
+ */
+ ClusterResponse sendRequest(ClusterRequest request) throws Exception;
+ /**
+ * Check whether the client has been started and ready for sending requests.
+ *
+ * @return true if the client is ready to send requests, otherwise false
+ */
+ boolean isReady();
+}
diff --git a/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/request/data/ConcurrentFlowAcquireRequestData.java b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/request/data/ConcurrentFlowAcquireRequestData.java
new file mode 100644
index 0000000000..fafdd09d81
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/request/data/ConcurrentFlowAcquireRequestData.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.request.data;
+
+/**
+ * @author yunfeiyanggzq
+ */
+public class ConcurrentFlowAcquireRequestData {
+ private long flowId;
+
+ private int count;
+
+ public long getFlowId() {
+ return flowId;
+ }
+
+ public ConcurrentFlowAcquireRequestData setFlowId(long flowId) {
+ this.flowId = flowId;
+ return this;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public ConcurrentFlowAcquireRequestData setCount(int count) {
+ this.count = count;
+ return this;
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/request/data/ConcurrentFlowReleaseRequestData.java b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/request/data/ConcurrentFlowReleaseRequestData.java
new file mode 100644
index 0000000000..193ddc9dd9
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/request/data/ConcurrentFlowReleaseRequestData.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.request.data;
+
+/**
+ * @author yunfeiyanggzq
+ */
+public class ConcurrentFlowReleaseRequestData {
+
+ private long tokenId;
+
+ public long getTokenId() {
+ return tokenId;
+ }
+
+ public ConcurrentFlowReleaseRequestData setTokenId(long tokenId) {
+ this.tokenId = tokenId;
+ return this;
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/response/data/ConcurrentFlowAcquireResponseData.java b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/response/data/ConcurrentFlowAcquireResponseData.java
new file mode 100644
index 0000000000..8105a65171
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/response/data/ConcurrentFlowAcquireResponseData.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.response.data;
+
+/**
+ * @author yunfeiyanggzq
+ */
+public class ConcurrentFlowAcquireResponseData {
+ private long tokenId;
+
+ public long getTokenId() {
+ return tokenId;
+ }
+
+ public ConcurrentFlowAcquireResponseData setTokenId(long tokenId) {
+ this.tokenId = tokenId;
+ return this;
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/response/data/ConcurrentFlowReleaseResponseData.java b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/response/data/ConcurrentFlowReleaseResponseData.java
new file mode 100644
index 0000000000..2e2e39ba88
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/response/data/ConcurrentFlowReleaseResponseData.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.response.data;
+
+/**
+ * @author yunfeiyanggzq
+ */
+public class ConcurrentFlowReleaseResponseData{
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/pom.xml b/sentinel-cluster/sentinel-cluster-server-default/pom.xml
index b222431b0e..fd8111b997 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/pom.xml
+++ b/sentinel-cluster/sentinel-cluster-server-default/pom.xml
@@ -61,10 +61,26 @@
mockito-coretest
+
+ org.powermock
+ powermock-module-junit4
+ test
+
+
+ org.powermock
+ powermock-api-mockito2
+ test
+ org.assertjassertj-coretest
-
+
+ org.powermock
+ powermock-core
+ 2.0.0
+ test
+
+
\ No newline at end of file
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java
index df69bef9dc..ac7dc6c552 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java
@@ -35,7 +35,7 @@
*/
final class ClusterFlowChecker {
- private static double calcGlobalThreshold(FlowRule rule) {
+ public static double calcGlobalThreshold(FlowRule rule) {
double count = rule.getCount();
switch (rule.getClusterConfig().getThresholdType()) {
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowChecker.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowChecker.java
new file mode 100644
index 0000000000..c01588dea8
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowChecker.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow;
+
+import com.alibaba.csp.sentinel.cluster.TokenResult;
+import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
+import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager;
+import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil;
+import com.alibaba.csp.sentinel.log.RecordLog;
+import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author yunfeiyanggzq
+ */
+final public class ConcurrentClusterFlowChecker {
+
+ public static double calcGlobalThreshold(FlowRule rule) {
+ double count = rule.getCount();
+ switch (rule.getClusterConfig().getThresholdType()) {
+ case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
+ return count;
+ case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
+ default:
+ int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
+ return count * connectedCount;
+ }
+ }
+
+ public static TokenResult acquireConcurrentToken(String clientAddress,/*@Valid*/ FlowRule rule, int acquireCount) {
+ long flowId = rule.getClusterConfig().getFlowId();
+ AtomicInteger nowCalls = CurrentConcurrencyManager.get(flowId);
+ if (nowCalls == null) {
+ RecordLog.warn("[ConcurrentClusterFlowChecker] Fail to get nowCalls by flowId<{}>", flowId);
+ return new TokenResult(TokenResultStatus.FAIL);
+ }
+
+ // check before enter the lock to improve the efficiency
+ if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) {
+ ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount);
+ return new TokenResult(TokenResultStatus.BLOCKED);
+ }
+
+ // ensure the atomicity of operations
+ // lock different nowCalls according rule to improve the performance
+ synchronized (nowCalls) {
+ // check again whether the request can pass.
+ if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) {
+ ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount);
+ return new TokenResult(TokenResultStatus.BLOCKED);
+ } else {
+ nowCalls.getAndAdd(acquireCount);
+ }
+ }
+
+ ClusterServerStatLogUtil.log("concurrent|pass|" + flowId, acquireCount);
+ TokenCacheNode node = TokenCacheNode.generateTokenCacheNode(rule, acquireCount, clientAddress);
+ TokenCacheNodeManager.putTokenCacheNode(node.getTokenId(), node);
+ TokenResult tokenResult = new TokenResult(TokenResultStatus.OK);
+ tokenResult.setTokenId(node.getTokenId());
+// System.out.println("成功获取token" + tokenResult.getTokenId());
+ return tokenResult;
+ }
+
+ public static TokenResult releaseConcurrentToken(/*@Valid*/ long tokenId) {
+ TokenCacheNode node = TokenCacheNodeManager.getTokenCacheNode(tokenId);
+ if (node == null) {
+ RecordLog.info("[ConcurrentClusterFlowChecker] Token<{}> is already released", tokenId);
+ return new TokenResult(TokenResultStatus.ALREADY_RELEASE);
+ }
+ FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(node.getFlowId());
+ if (rule == null) {
+ RecordLog.info("[ConcurrentClusterFlowChecker] Fail to get rule by flowId<{}>", node.getFlowId());
+ return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
+ }
+ if (TokenCacheNodeManager.removeTokenCacheNode(tokenId) == null) {
+ RecordLog.info("[ConcurrentClusterFlowChecker] Token<{}> is already released", tokenId);
+ return new TokenResult(TokenResultStatus.ALREADY_RELEASE);
+ }
+ int acquireCount = node.getAcquireCount();
+ AtomicInteger nowCalls = CurrentConcurrencyManager.get(node.getFlowId());
+ nowCalls.getAndAdd(-1 * acquireCount);
+ rule.getClusterConfig().addReleaseCount(acquireCount);
+ ClusterServerStatLogUtil.log("concurrent|release|" + rule.getClusterConfig().getFlowId(), acquireCount);
+// System.out.println("成功释放token" + node.getTokenId());
+ return new TokenResult(TokenResultStatus.RELEASE_OK);
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java
index 2953a1350b..a3de0becff 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java
@@ -15,16 +15,16 @@
*/
package com.alibaba.csp.sentinel.cluster.flow;
-import java.util.Collection;
-
-import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenResult;
+import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenService;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;
+import java.util.Collection;
+
/**
* Default implementation for cluster {@link TokenService}.
*
@@ -61,10 +61,35 @@ public TokenResult requestParamToken(Long ruleId, int acquireCount, Collection list, /*@Valid*/ String
final ConcurrentHashMap ruleMap = new ConcurrentHashMap<>();
Set flowIdSet = new HashSet<>();
-
for (FlowRule rule : list) {
if (!rule.isClusterMode()) {
continue;
@@ -351,6 +357,9 @@ private static void applyClusterFlowRule(List list, /*@Valid*/ String
ruleMap.put(flowId, rule);
FLOW_NAMESPACE_MAP.put(flowId, namespace);
flowIdSet.add(flowId);
+ if(!CurrentConcurrencyManager.containsFlowId(flowId)){
+ CurrentConcurrencyManager.put(flowId,0);
+ }
// Prepare cluster metric from valid flow ID.
ClusterMetricStatistics.putMetricIfAbsent(flowId,
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/ClusterConcurrentCheckerLogListener.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/ClusterConcurrentCheckerLogListener.java
new file mode 100644
index 0000000000..4b1773b5c8
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/ClusterConcurrentCheckerLogListener.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;
+
+import com.alibaba.csp.sentinel.cluster.flow.ConcurrentClusterFlowChecker;
+import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
+import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil;
+import com.alibaba.csp.sentinel.log.RecordLog;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author yunfeiyanggzq
+ */
+public class ClusterConcurrentCheckerLogListener implements Runnable {
+ @Override
+ public void run() {
+ try {
+ collectInformation();
+ } catch (Exception e) {
+ RecordLog.warn("[ClusterConcurrentCheckerLogListener] Failed to record concurrent flow control regularly", e);
+ }
+ }
+
+ private void collectInformation() {
+ ConcurrentHashMap nowCallsMap = CurrentConcurrencyManager.getConcurrencyMap();
+ for (long flowId : nowCallsMap.keySet()) {
+ FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(flowId);
+ if (rule == null || nowCallsMap.get(flowId).get() == 0) {
+ continue;
+ }
+ double concurrencyLevel = ConcurrentClusterFlowChecker.calcGlobalThreshold(rule);
+ String resource = rule.getResource();
+ ClusterServerStatLogUtil.log("concurrent|resource:" + resource + "|flowId:" + flowId + "|concurrencyLevel:" + concurrencyLevel, nowCallsMap.get(flowId).get());
+ }
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManager.java
new file mode 100644
index 0000000000..0fdfb1e020
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManager.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;
+
+import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * We use a ConcurrentHashMap type structure to store nowCalls corresponding to
+ * rules, where the key is flowId and the value is nowCalls. Because nowCalls may be accessed and
+ * modified by multiple threads, we consider to design it as an AtomicInteger class . Each newly
+ * created rule will add a nowCalls object to this map. If the concurrency corresponding to a rule changes,
+ * we will update the corresponding nowCalls in real time. Each request to obtain a token will increase the nowCalls;
+ * and the request to release the token will reduce the nowCalls.
+ *
+ * @author yunfeiyanggzq
+ */
+public final class CurrentConcurrencyManager {
+ /**
+ * use ConcurrentHashMap to store the nowCalls of rules.
+ */
+ private static final ConcurrentHashMap NOW_CALLS_MAP = new ConcurrentHashMap();
+
+ private static ClusterConcurrentCheckerLogListener logTask = null;
+
+ @SuppressWarnings("PMD.ThreadPoolCreationRule")
+ private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
+ new NamedThreadFactory("sentinel-cluster-concurrency-record-task", true));
+
+ static {
+ logTask = new ClusterConcurrentCheckerLogListener();
+ SCHEDULER.scheduleAtFixedRate(logTask, 0, 1, TimeUnit.SECONDS);
+ }
+
+ /**
+ * update the nowCalls.
+ */
+ public static Boolean update(Long flowId, Integer count) {
+
+ AtomicInteger nowCalls = NOW_CALLS_MAP.get(flowId);
+ if (nowCalls == null) {
+ return false;
+ }
+ nowCalls.getAndAdd(count);
+ return true;
+ }
+
+ /**
+ * get the nowCalls.
+ */
+ public static AtomicInteger get(Long flowId) {
+ return NOW_CALLS_MAP.get(flowId);
+ }
+
+ /**
+ * delete the nowCalls.
+ */
+ public static void remove(Long flowId) {
+ NOW_CALLS_MAP.remove(flowId);
+ }
+
+ /**
+ * add the nowCalls.
+ */
+ public static void put(Long flowId, Integer nowCalls) {
+ NOW_CALLS_MAP.put(flowId, new AtomicInteger(nowCalls));
+ }
+
+ /**
+ * check flow id.
+ */
+ public static boolean containsFlowId(Long flowId) {
+ return NOW_CALLS_MAP.containsKey(flowId);
+ }
+
+ /**
+ * get NOW_CALLS_MAP.
+ */
+ public static ConcurrentHashMap getConcurrencyMap() {
+ return NOW_CALLS_MAP;
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNode.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNode.java
new file mode 100644
index 0000000000..26852bcdf5
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNode.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;
+
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
+
+import java.util.UUID;
+
+/**
+ * We use TokenCacheNodeManager to store the tokenId, whose the underlying storage structure
+ * is ConcurrentLinkedHashMap, Its storage node is TokenCacheNode. In order to operate the nowCalls value when
+ * the expired tokenId is deleted regularly, we need to store the flowId in TokenCacheNode.
+ *
+ * @author yunfeiyanggzq
+ */
+public class TokenCacheNode {
+ /**
+ * the TokenId of the token
+ */
+ private Long tokenId;
+ /**
+ * the client goes offline detection time
+ */
+ private Long clientTimeout;
+ /**
+ * the resource called over time detection time
+ */
+ private Long resourceTimeout;
+ /**
+ * the flow rule id corresponding to the token
+ */
+ private Long flowId;
+ /**
+ * the number this token occupied
+ */
+ private int acquireCount;
+
+ private String clientAddress;
+
+ public TokenCacheNode() {
+ }
+
+ public static TokenCacheNode generateTokenCacheNode(FlowRule rule, int acquireCount, String clientAddress) {
+ TokenCacheNode node = new TokenCacheNode();
+ node.setTokenId(UUID.randomUUID().getMostSignificantBits());
+ node.setFlowId(rule.getClusterConfig().getFlowId());
+ node.setClientTimeout(rule.getClusterConfig().getClientOfflineTime());
+ node.setResourceTimeout(rule.getClusterConfig().getResourceTimeout());
+ node.setAcquireCount(acquireCount);
+ node.setClientAddress(clientAddress);
+ return node;
+ }
+
+ public Long getTokenId() {
+ return tokenId;
+ }
+
+ public void setTokenId(Long tokenId) {
+ this.tokenId = tokenId;
+ }
+
+ public Long getClientTimeout() {
+ return clientTimeout;
+ }
+
+ public void setClientTimeout(Long clientTimeout) {
+ this.clientTimeout = clientTimeout + System.currentTimeMillis();
+ }
+
+ public Long getResourceTimeout() {
+ return this.resourceTimeout;
+ }
+
+ public void setResourceTimeout(Long resourceTimeout) {
+ this.resourceTimeout = resourceTimeout + System.currentTimeMillis();
+ }
+
+ public Long getFlowId() {
+ return flowId;
+ }
+
+ public void setFlowId(Long flowId) {
+ this.flowId = flowId;
+ }
+
+ public int getAcquireCount() {
+ return acquireCount;
+ }
+
+ public void setAcquireCount(int acquireCount) {
+ this.acquireCount = acquireCount;
+ }
+
+ public String getClientAddress() {
+ return clientAddress;
+ }
+
+ public void setClientAddress(String clientAddress) {
+ this.clientAddress = clientAddress;
+ }
+
+ @Override
+ public String toString() {
+ return "TokenCacheNode{" +
+ "tokenId=" + tokenId +
+ ", clientTimeout=" + clientTimeout +
+ ", resourceTimeout=" + resourceTimeout +
+ ", flowId=" + flowId +
+ ", acquireCount=" + acquireCount +
+ ", clientAddress='" + clientAddress + '\'' +
+ '}';
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManager.java
new file mode 100644
index 0000000000..84f4225f5a
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManager.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;
+
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.expire.ExpireStrategy;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.expire.RegularExpireStrategy;
+import com.alibaba.csp.sentinel.log.RecordLog;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.Weighers;
+
+/**
+ * @author yunfeiyanggzq
+ */
+public class TokenCacheNodeManager {
+ private static ConcurrentLinkedHashMap TOKEN_CACHE_NODE_MAP;
+ /**
+ * the strategy of removing expired token
+ */
+ private static ExpireStrategy expireStrategy;
+
+
+ private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
+ private static final int DEFAULT_CAPACITY = Integer.MAX_VALUE;
+
+ static {
+ ExpireStrategy expireStrategy = new RegularExpireStrategy();
+ prepare(DEFAULT_CONCURRENCY_LEVEL, DEFAULT_CAPACITY, expireStrategy);
+ }
+
+ public static void prepare(int concurrencyLevel, int maximumWeightedCapacity, ExpireStrategy strategy) {
+ AssertUtil.isTrue(concurrencyLevel > 0, "concurrencyLevel must be positive");
+ AssertUtil.isTrue(maximumWeightedCapacity > 0, "maximumWeightedCapacity must be positive");
+ AssertUtil.isTrue(strategy != null, "expireStrategy can;t be null");
+
+ TOKEN_CACHE_NODE_MAP = new ConcurrentLinkedHashMap.Builder()
+ .concurrencyLevel(concurrencyLevel)
+ .maximumWeightedCapacity(maximumWeightedCapacity)
+ .weigher(Weighers.singleton())
+ .build();
+ // Start the task of regularly clearing expired keys
+ expireStrategy = strategy;
+ expireStrategy.removeExpireKey(TOKEN_CACHE_NODE_MAP);
+ }
+
+
+ public static TokenCacheNode getTokenCacheNode(long tokenId) {
+ return TOKEN_CACHE_NODE_MAP.get(tokenId);
+ }
+
+ public static void putTokenCacheNode(long tokenId, TokenCacheNode cacheNode) {
+ TOKEN_CACHE_NODE_MAP.put(tokenId, cacheNode);
+ }
+
+ public static boolean isContainsTokenId(long tokenId) {
+ return TOKEN_CACHE_NODE_MAP.containsKey(tokenId);
+ }
+
+ public static TokenCacheNode removeTokenCacheNode(long tokenId) {
+ return TOKEN_CACHE_NODE_MAP.remove(tokenId);
+ }
+
+ public static int getSize() {
+ return TOKEN_CACHE_NODE_MAP.size();
+ }
+
+ public static ConcurrentLinkedHashMap getCache() {
+ return TOKEN_CACHE_NODE_MAP;
+ }
+
+ public static boolean validToken(TokenCacheNode cacheNode) {
+ return cacheNode.getTokenId() != null && cacheNode.getFlowId() != null && cacheNode.getClientTimeout() >= 0 && cacheNode.getResourceTimeout() >= 0;
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/ExpireStrategy.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/ExpireStrategy.java
new file mode 100644
index 0000000000..d733ff15d2
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/ExpireStrategy.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.expire;
+
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode;
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+
+/**
+ * @author yunfeiyagnggzq
+ */
+public interface ExpireStrategy {
+ /**
+ * clean expireKey-Value
+ *
+ * @param map the map needs to detect expired tokens.
+ * @return the number of the key cleaned
+ */
+ void removeExpireKey(ConcurrentLinkedHashMap map);
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/RegularExpireStrategy.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/RegularExpireStrategy.java
new file mode 100644
index 0000000000..b4873a13bd
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/RegularExpireStrategy.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.expire;
+
+import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode;
+import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager;
+import com.alibaba.csp.sentinel.log.RecordLog;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * We need to consider the situation that the token client goes offline
+ * or the resource call times out. It can be detected by sourceTimeout
+ * and clientTimeout. The resource calls timeout detection is triggered
+ * on the token client. If the resource is called over time, the token
+ * client will request the token server to release token or refresh the
+ * token. The client offline detection is triggered on the token server.
+ * If the offline detection time is exceeded, token server will trigger
+ * the detection token client’s status. If the token client is offline,
+ * token server will delete the corresponding tokenId. If it is not offline,
+ * token server will continue to save it.
+ *
+ * @author yunfeiyanggzq
+ **/
+public class RegularExpireStrategy implements ExpireStrategy {
+ /**
+ * The max number of token deleted each time,
+ * the number of expired key-value pairs deleted each time does not exceed this number
+ */
+ private long executeCount;
+ /**
+ * Length of time for task execution
+ */
+ private long executeDuration;
+ /**
+ * Frequency of task execution
+ */
+ private long executeRate;
+ /**
+ * the local cache of tokenId
+ */
+ private ConcurrentLinkedHashMap localCache;
+
+ @SuppressWarnings("PMD.ThreadPoolCreationRule")
+ private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+
+ private final long DEFAULT_EXECUTE_COUNT = 1000;
+ private final long DEFAULT_EXECUTE_DURATION = 600;
+ private final long DEFAULT_EXECUTE_RATE = 1000;
+
+ public RegularExpireStrategy() {
+ this.executeCount = DEFAULT_EXECUTE_COUNT;
+ this.executeDuration = DEFAULT_EXECUTE_DURATION;
+ this.executeRate = DEFAULT_EXECUTE_RATE;
+ }
+
+ @Override
+ public void removeExpireKey(ConcurrentLinkedHashMap localCache) {
+ AssertUtil.isTrue(localCache != null, " local cache can't be null");
+ this.localCache = localCache;
+ executor.scheduleAtFixedRate(new MyTask(), 0, executeRate, TimeUnit.MILLISECONDS);
+ }
+
+ private class MyTask implements Runnable {
+ @Override
+ public void run() {
+ try {
+ clearToken();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ RecordLog.warn("[RegularExpireStrategy] undefined throwable<{}> during clear token", e);
+ }
+ }
+ }
+
+ private void clearToken() {
+ long start = System.currentTimeMillis();
+ List keyList = new ArrayList<>(localCache.keySet());
+ for (int i = 0; i < executeCount && i < keyList.size(); i++) {
+ // time out execution exit
+ if (System.currentTimeMillis() - start > executeDuration) {
+ RecordLog.info("[RegularExpireStrategy] End the process of expired token detection because of execute time is more than executeDuration<{}>", executeDuration);
+ break;
+ }
+ // use ConcurrentLinkedHashMap to improve the expiration detection progress
+ Long key = keyList.get(i);
+ TokenCacheNode node = localCache.get(key);
+ if (node == null) {
+ continue;
+ }
+ if (!ConnectionManager.isClientOnline(node.getClientAddress()) && node.getClientTimeout() - System.currentTimeMillis() < 0) {
+ removeToken(key, node);
+ System.out.println("客户端掉线删除"+node.getClientAddress());
+ RecordLog.info("[RegularExpireStrategy] Delete the expired token<{}> because of client offline", node.getTokenId());
+ continue;
+ }
+
+ // If we find that token's save time is much longer than the client's call resource timeout time, token will be determined to timeout and the client go wrong
+ long resourceTimeout = ClusterFlowRuleManager.getFlowRuleById(node.getFlowId()).getClusterConfig().getResourceTimeout();
+ if (System.currentTimeMillis() - node.getResourceTimeout() > 2 * resourceTimeout) {
+ System.out.println("保存超时删除"+node.getClientAddress());
+ removeToken(key, node);
+ RecordLog.info("[RegularExpireStrategy] Delete the expired token<{}> because of resource timeout", node.getTokenId());
+ }
+ }
+ }
+
+ private void removeToken(long tokenId, TokenCacheNode node) {
+ if (localCache.remove(tokenId) == null) {
+ RecordLog.info("[RegularExpireStrategy] Token<{}> is already released", tokenId);
+ return;
+ }
+ AtomicInteger nowCalls = CurrentConcurrencyManager.get(node.getFlowId());
+ if (nowCalls == null) {
+ return;
+ }
+ nowCalls.getAndAdd(node.getAcquireCount() * -1);
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java
index fd95cbf2f9..18e3026ea0 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java
@@ -15,12 +15,12 @@
*/
package com.alibaba.csp.sentinel.cluster.server;
-import java.util.Collection;
-
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenService;
+import java.util.Collection;
+
/**
* Default embedded token server in Sentinel which wraps the {@link SentinelDefaultTokenServer}
* and the {@link TokenService} from SPI provider.
@@ -58,4 +58,20 @@ public TokenResult requestParamToken(Long ruleId, int acquireCount, Collection {
+
+ @Override
+ public ConcurrentFlowAcquireRequestData decode(ByteBuf source) {
+ if (source.readableBytes() == 12) {
+ ConcurrentFlowAcquireRequestData requestData = new ConcurrentFlowAcquireRequestData()
+ .setFlowId(source.readLong())
+ .setCount(source.readInt());
+ return requestData;
+ }
+ return null;
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/ConcurrentFlowAcquireResponseDataWriter.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/ConcurrentFlowAcquireResponseDataWriter.java
new file mode 100644
index 0000000000..67e4a0b55c
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/ConcurrentFlowAcquireResponseDataWriter.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.server.codec.data;
+
+import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
+import com.alibaba.csp.sentinel.cluster.response.data.ConcurrentFlowAcquireResponseData;
+import io.netty.buffer.ByteBuf;
+
+/**
+ * @author yunfeiyanggzq
+ */
+public class ConcurrentFlowAcquireResponseDataWriter implements EntityWriter {
+ @Override
+ public void writeTo(ConcurrentFlowAcquireResponseData entity, ByteBuf out) {
+ if (entity == null || out == null) {
+ return;
+ }
+ out.writeLong(entity.getTokenId());
+ }
+}
\ No newline at end of file
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/ConcurrentFlowReleaseRequestDataDecoder.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/ConcurrentFlowReleaseRequestDataDecoder.java
new file mode 100644
index 0000000000..80a9c4781a
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/ConcurrentFlowReleaseRequestDataDecoder.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.server.codec.data;
+
+import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
+import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowAcquireRequestData;
+import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowReleaseRequestData;
+import io.netty.buffer.ByteBuf;
+
+/**
+ * @author yunfeiyanggzq
+ */
+public class ConcurrentFlowReleaseRequestDataDecoder implements EntityDecoder {
+ @Override
+ public ConcurrentFlowReleaseRequestData decode(ByteBuf source) {
+ ConcurrentFlowReleaseRequestData requestData = new ConcurrentFlowReleaseRequestData().setTokenId(source.readLong());
+ return requestData;
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/ConcurrentFlowReleaseResponseDataWriter.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/ConcurrentFlowReleaseResponseDataWriter.java
new file mode 100644
index 0000000000..53d5ddc852
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/ConcurrentFlowReleaseResponseDataWriter.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.server.codec.data;
+
+import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
+import com.alibaba.csp.sentinel.cluster.response.data.ConcurrentFlowAcquireResponseData;
+import com.alibaba.csp.sentinel.cluster.response.data.ConcurrentFlowReleaseResponseData;
+import io.netty.buffer.ByteBuf;
+
+/**
+ * @author yunfeiyanggzq
+ */
+public class ConcurrentFlowReleaseResponseDataWriter implements EntityWriter {
+ @Override
+ public void writeTo(ConcurrentFlowReleaseResponseData entity, ByteBuf target) {
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/FlowRequestDataDecoder.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/FlowRequestDataDecoder.java
index 5391be58d6..803d08b28a 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/FlowRequestDataDecoder.java
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/FlowRequestDataDecoder.java
@@ -25,7 +25,7 @@
* Decoder for {@link FlowRequestData} from {@code ByteBuf} stream. The layout:
*
*
- * | flow ID (8) | count (4) | priority flag (1) |
+ * | flow ID (4) | count (4) | priority flag (1) |
*
*
* @author Eric Zhao
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterConcurrencyCommandHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterConcurrencyCommandHandler.java
new file mode 100644
index 0000000000..82050bea21
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterConcurrencyCommandHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.server.command.handler;
+
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
+import com.alibaba.csp.sentinel.command.CommandHandler;
+import com.alibaba.csp.sentinel.command.CommandRequest;
+import com.alibaba.csp.sentinel.command.CommandResponse;
+import com.alibaba.csp.sentinel.command.annotation.CommandMapping;
+import com.alibaba.csp.sentinel.util.StringUtil;
+import com.alibaba.fastjson.JSON;
+
+/**
+ * @author yunfeiyanggzq
+ */
+@CommandMapping(name = "cluster/server/concurrency", desc = "get cluster concurrency")
+public class FetchClusterConcurrencyCommandHandler implements CommandHandler {
+ @Override
+ public CommandResponse handle(CommandRequest request) {
+ String flowId = request.getParam("flowId");
+ if (!StringUtil.isEmpty(flowId)) {
+ return CommandResponse.ofSuccess(JSON.toJSONString(CurrentConcurrencyManager.get(Long.valueOf(flowId))));
+ } else {
+ return CommandResponse.ofSuccess(JSON.toJSONString(CurrentConcurrencyManager.getConcurrencyMap()));
+ }
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java
index 5f6a42d133..10d59d93e3 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java
@@ -111,6 +111,9 @@ public static ConnectionGroup getConnectionGroup(String namespace) {
ConnectionGroup group = CONN_MAP.get(namespace);
return group;
}
+ public static boolean isClientOnline(String clientAddress){
+ return NAMESPACE_MAP.containsKey(clientAddress);
+ }
static void clear() {
CONN_MAP.clear();
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java
index da5375d5ed..14151db5f0 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java
@@ -76,7 +76,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
RecordLog.warn("[TokenServerHandler] No processor for request type: " + request.getType());
writeBadResponse(ctx, request);
} else {
- ClusterResponse> response = processor.processRequest(request);
+ ClusterResponse> response = processor.processRequest(ctx,request);
writeResponse(ctx, response);
}
}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/init/DefaultClusterServerInitFunc.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/init/DefaultClusterServerInitFunc.java
index 82b3bdc3a3..5e6fa87023 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/init/DefaultClusterServerInitFunc.java
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/init/DefaultClusterServerInitFunc.java
@@ -17,11 +17,7 @@
import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider;
-import com.alibaba.csp.sentinel.cluster.server.codec.data.FlowRequestDataDecoder;
-import com.alibaba.csp.sentinel.cluster.server.codec.data.FlowResponseDataWriter;
-import com.alibaba.csp.sentinel.cluster.server.codec.data.ParamFlowRequestDataDecoder;
-import com.alibaba.csp.sentinel.cluster.server.codec.data.PingRequestDataDecoder;
-import com.alibaba.csp.sentinel.cluster.server.codec.data.PingResponseDataWriter;
+import com.alibaba.csp.sentinel.cluster.server.codec.data.*;
import com.alibaba.csp.sentinel.cluster.server.codec.registry.RequestDataDecodeRegistry;
import com.alibaba.csp.sentinel.cluster.server.codec.registry.ResponseDataWriterRegistry;
import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessorProvider;
@@ -51,12 +47,16 @@ private void initDefaultEntityWriters() {
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PING, new PingResponseDataWriter());
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_FLOW, new FlowResponseDataWriter());
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PARAM_FLOW, new FlowResponseDataWriter());
+ ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_ACQUIRE, new ConcurrentFlowAcquireResponseDataWriter());
+ ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_RELEASE, new ConcurrentFlowReleaseResponseDataWriter());
}
private void initDefaultEntityDecoders() {
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PING, new PingRequestDataDecoder());
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_FLOW, new FlowRequestDataDecoder());
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PARAM_FLOW, new ParamFlowRequestDataDecoder());
+ RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_ACQUIRE, new ConcurrentFlowAcquireRequestDataDecoder());
+ RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_RELEASE, new ConcurrentFlowReleaseRequestDataDecoder());
}
private void initDefaultProcessors() {
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ConcurrentFlowRequestAcquireProcessor.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ConcurrentFlowRequestAcquireProcessor.java
new file mode 100644
index 0000000000..9ed51d15be
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ConcurrentFlowRequestAcquireProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.server.processor;
+
+import com.alibaba.csp.sentinel.cluster.ClusterConstants;
+import com.alibaba.csp.sentinel.cluster.TokenResult;
+import com.alibaba.csp.sentinel.cluster.TokenService;
+import com.alibaba.csp.sentinel.cluster.annotation.RequestType;
+import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
+import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowAcquireRequestData;
+import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
+import com.alibaba.csp.sentinel.cluster.response.data.ConcurrentFlowAcquireResponseData;
+import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.net.InetSocketAddress;
+
+/**
+ * @author yunfeiyanggzq
+ */
+@RequestType(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_ACQUIRE)
+public class ConcurrentFlowRequestAcquireProcessor implements RequestProcessor {
+ @Override
+ public ClusterResponse processRequest(ChannelHandlerContext ctx, ClusterRequest request) {
+ TokenService tokenService = TokenServiceProvider.getService();
+ long flowId = request.getData().getFlowId();
+ int count = request.getData().getCount();
+ String clientAddress = getRemoteAddress(ctx);
+ TokenResult result = tokenService.requestConcurrentToken(clientAddress, flowId, count);
+ return toResponse(result, request);
+ }
+
+ private ClusterResponse toResponse(TokenResult result, ClusterRequest request) {
+ return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
+ new ConcurrentFlowAcquireResponseData().setTokenId(result.getTokenId())
+ );
+ }
+
+ private String getRemoteAddress(ChannelHandlerContext ctx) {
+ if (ctx.channel().remoteAddress() == null) {
+ return null;
+ }
+ InetSocketAddress inetAddress = (InetSocketAddress) ctx.channel().remoteAddress();
+ return inetAddress.getAddress().getHostAddress() + ":" + inetAddress.getPort();
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ConcurrentFlowRequestReleaseProcessor.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ConcurrentFlowRequestReleaseProcessor.java
new file mode 100644
index 0000000000..c5ec96594b
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ConcurrentFlowRequestReleaseProcessor.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.server.processor;
+
+import com.alibaba.csp.sentinel.cluster.ClusterConstants;
+import com.alibaba.csp.sentinel.cluster.TokenResult;
+import com.alibaba.csp.sentinel.cluster.TokenService;
+import com.alibaba.csp.sentinel.cluster.annotation.RequestType;
+import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
+import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowReleaseRequestData;
+import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
+import com.alibaba.csp.sentinel.cluster.response.data.ConcurrentFlowReleaseResponseData;
+import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider;
+import io.netty.channel.ChannelHandlerContext;
+
+/**
+ * @author yunfeiyanggzq
+ */
+
+@RequestType(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_RELEASE)
+public class ConcurrentFlowRequestReleaseProcessor implements RequestProcessor {
+ @Override
+ public ClusterResponse processRequest(ChannelHandlerContext ctx, ClusterRequest request) {
+ TokenService tokenService = TokenServiceProvider.getService();
+ long tokenId = request.getData().getTokenId();
+ TokenResult result = tokenService.releaseConcurrentToken(tokenId);
+ return toResponse(result, request);
+ }
+
+ private ClusterResponse toResponse(TokenResult result, ClusterRequest request) {
+ return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(), null);
+ }
+}
+
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/FlowRequestProcessor.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/FlowRequestProcessor.java
index d08a84b3ed..d5df7b7914 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/FlowRequestProcessor.java
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/FlowRequestProcessor.java
@@ -24,6 +24,7 @@
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider;
+import io.netty.channel.ChannelHandlerContext;
/**
* @author Eric Zhao
@@ -33,7 +34,7 @@
public class FlowRequestProcessor implements RequestProcessor {
@Override
- public ClusterResponse processRequest(ClusterRequest request) {
+ public ClusterResponse processRequest(ChannelHandlerContext ctx,ClusterRequest request) {
TokenService tokenService = TokenServiceProvider.getService();
long flowId = request.getData().getFlowId();
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ParamFlowRequestProcessor.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ParamFlowRequestProcessor.java
index 888fc2b5f3..0c395309af 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ParamFlowRequestProcessor.java
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ParamFlowRequestProcessor.java
@@ -26,6 +26,7 @@
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider;
+import io.netty.channel.ChannelHandlerContext;
/**
* @author Eric Zhao
@@ -35,7 +36,7 @@
public class ParamFlowRequestProcessor implements RequestProcessor {
@Override
- public ClusterResponse processRequest(ClusterRequest request) {
+ public ClusterResponse processRequest(ChannelHandlerContext ctx,ClusterRequest request) {
TokenService tokenService = TokenServiceProvider.getService();
long flowId = request.getData().getFlowId();
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessor.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessor.java
index 8cce73ea2c..f53bf0a87c 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessor.java
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessor.java
@@ -17,6 +17,7 @@
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
+import io.netty.channel.ChannelHandlerContext;
/**
* Interface of cluster request processor.
@@ -31,8 +32,9 @@ public interface RequestProcessor {
/**
* Process the cluster request.
*
+ * @param ctx The ChannelHandlerContext of the request.
* @param request Sentinel cluster request
* @return the response after processed
*/
- ClusterResponse processRequest(ClusterRequest request);
+ ClusterResponse processRequest(ChannelHandlerContext ctx, ClusterRequest request);
}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor
index 991b781dea..a256d5641d 100644
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor
@@ -1,2 +1,4 @@
com.alibaba.csp.sentinel.cluster.server.processor.FlowRequestProcessor
-com.alibaba.csp.sentinel.cluster.server.processor.ParamFlowRequestProcessor
\ No newline at end of file
+com.alibaba.csp.sentinel.cluster.server.processor.ParamFlowRequestProcessor
+com.alibaba.csp.sentinel.cluster.server.processor.ConcurrentFlowRequestAcquireProcessor
+com.alibaba.csp.sentinel.cluster.server.processor.ConcurrentFlowRequestReleaseProcessor
\ No newline at end of file
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler
index 0ebc4f5565..dc818e8015 100755
--- a/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler
@@ -7,4 +7,5 @@ com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyServerNamespaceSet
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerInfoCommandHandler
-com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterMetricCommandHandler
\ No newline at end of file
+com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterMetricCommandHandler
+com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterConcurrencyCommandHandler
\ No newline at end of file
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowCheckerTest.java b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowCheckerTest.java
new file mode 100644
index 0000000000..1479786769
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowCheckerTest.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow;
+
+import com.alibaba.csp.sentinel.cluster.TokenResult;
+import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
+import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager;
+import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager;
+import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
+import com.alibaba.csp.sentinel.slots.block.RuleConstant;
+import com.alibaba.csp.sentinel.slots.block.flow.ClusterFlowConfig;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
+import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * @author yunfeiyanggzq
+ */
+public class ConcurrentClusterFlowCheckerTest extends AbstractTimeBasedTest {
+ @Before
+ public void setUp() {
+ FlowRule rule = new FlowRule();
+ ClusterFlowConfig config = new ClusterFlowConfig();
+ config.setResourceTimeout(500);
+ config.setClientOfflineTime(1000);
+ config.setFlowId(111L);
+ config.setThresholdType(ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL);
+ rule.setClusterConfig(config);
+ rule.setClusterMode(true);
+ rule.setCount(10);
+ rule.setResource("test");
+ rule.setGrade(RuleConstant.FLOW_GRADE_THREAD);
+ ArrayList rules = new ArrayList<>();
+ rules.add(rule);
+ ClusterFlowRuleManager.registerPropertyIfAbsent("1-name");
+ ClusterFlowRuleManager.loadRules("1-name", rules);
+ }
+
+ @Test
+ public void testEasyAcquireAndRelease() throws InterruptedException {
+ setCurrentMillis(System.currentTimeMillis());
+ FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(111L);
+ ArrayList list = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
+ Assert.assertTrue("fail to acquire token",
+ result.getStatus() == TokenResultStatus.OK && result.getTokenId() != 0);
+ list.add(result.getTokenId());
+ }
+ for (int i = 0; i < 10; i++) {
+ TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
+ Assert.assertTrue("fail to acquire block token",
+ result.getStatus() == TokenResultStatus.BLOCKED);
+ }
+ for (int i = 0; i < 10; i++) {
+ TokenResult result = ConcurrentClusterFlowChecker.releaseConcurrentToken(list.get(i));
+ Assert.assertTrue("fail to release token",
+ result.getStatus() == TokenResultStatus.RELEASE_OK);
+ }
+ Assert.assertTrue("fail to release token",
+ CurrentConcurrencyManager.get(111L).get() == 0 && TokenCacheNodeManager.getSize() == 0);
+ }
+
+ @Test
+ public void testConcurrentAcquireAndRelease() throws InterruptedException {
+ setCurrentMillis(System.currentTimeMillis());
+ final FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(111L);
+ final CountDownLatch countDownLatch = new CountDownLatch(1000);
+ ExecutorService pool = Executors.newFixedThreadPool(100);
+
+ for (long i = 0; i < 1000; i++) {
+ Runnable task = new Runnable() {
+ @Override
+ public void run() {
+ assert rule != null;
+ TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
+ Assert.assertTrue("concurrent control fail", CurrentConcurrencyManager.get(111L).get() <= rule.getCount());
+ if (result.getStatus() == TokenResultStatus.OK) {
+ ConcurrentClusterFlowChecker.releaseConcurrentToken(result.getTokenId());
+ }
+ countDownLatch.countDown();
+ }
+ };
+ pool.execute(task);
+ }
+ countDownLatch.await();
+ pool.shutdown();
+ assert rule != null;
+ Assert.assertTrue("fail to acquire and release token",
+ CurrentConcurrencyManager.get(rule.getClusterConfig().getFlowId()).get() == 0 && TokenCacheNodeManager.getSize() == 0);
+ }
+
+ @Test
+ public void testReleaseExpiredToken() throws InterruptedException {
+ ConnectionManager.addConnection("test", "127.0.0.1");
+ FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(111L);
+ for (int i = 0; i < 10; i++) {
+ ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
+ }
+ Thread.sleep(2000);
+ Assert.assertTrue("fail to acquire and release token", CurrentConcurrencyManager.get(rule.getClusterConfig().getFlowId()).get() == 0 && TokenCacheNodeManager.getSize() == 0);
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statstic/concurrent/CurrentConcurrencyManagerTest.java b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statstic/concurrent/CurrentConcurrencyManagerTest.java
new file mode 100644
index 0000000000..82009773a8
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statstic/concurrent/CurrentConcurrencyManagerTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statstic.concurrent;
+
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class CurrentConcurrencyManagerTest {
+ @Test
+ public void updateTest() throws InterruptedException {
+ CurrentConcurrencyManager.put(111L, 0);
+ CurrentConcurrencyManager.put(222L, 0);
+ final CountDownLatch countDownLatch = new CountDownLatch(1000);
+ ExecutorService pool = Executors.newFixedThreadPool(100);
+ for (int i = 0; i < 1000; i++) {
+ Runnable task = new Runnable() {
+ @Override
+ public void run() {
+ CurrentConcurrencyManager.update(111L, 1);
+ CurrentConcurrencyManager.update(222L, 2);
+ countDownLatch.countDown();
+ }
+ };
+ pool.execute(task);
+ }
+ countDownLatch.await();
+ pool.shutdown();
+ Assert.assertEquals(1000, CurrentConcurrencyManager.get(111L).get());
+ Assert.assertEquals(2000, CurrentConcurrencyManager.get(222L).get());
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statstic/concurrent/TokenCacheNodeManagerTest.java b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statstic/concurrent/TokenCacheNodeManagerTest.java
new file mode 100644
index 0000000000..c644eda8a9
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statstic/concurrent/TokenCacheNodeManagerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statstic.concurrent;
+
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager;
+import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TokenCacheNodeManagerTest extends AbstractTimeBasedTest {
+ @Test
+ public void testPutTokenCacheNode() throws InterruptedException {
+ setCurrentMillis(System.currentTimeMillis());
+
+ for (long i = 0; i < 100; i++) {
+ final TokenCacheNode node = new TokenCacheNode();
+ node.setTokenId(i);
+ node.setFlowId(111L);
+ node.setResourceTimeout(10000L);
+ node.setClientTimeout(10000L);
+ node.setClientAddress("localhost");
+ if (TokenCacheNodeManager.validToken(node)) {
+ TokenCacheNodeManager.putTokenCacheNode(node.getTokenId(), node);
+
+ }
+ }
+ Assert.assertEquals(100, TokenCacheNodeManager.getSize());
+ for (int i = 0; i < 100; i++) {
+ TokenCacheNodeManager.getTokenCacheNode((long) (Math.random() * 100));
+ }
+ List keyList = new ArrayList<>(TokenCacheNodeManager.getCache().keySet());
+ for (int i = 0; i < 100; i++) {
+ Assert.assertEquals(i, (long) keyList.get(i));
+ }
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java
new file mode 100644
index 0000000000..6e8bab4d1e
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.test;
+
+import com.alibaba.csp.sentinel.util.TimeUtil;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Mock support for {@link TimeUtil}
+ *
+ * @author jason
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({TimeUtil.class})
+public abstract class AbstractTimeBasedTest {
+
+ private long currentMillis = 0;
+
+ {
+ PowerMockito.mockStatic(TimeUtil.class);
+ PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis);
+ }
+
+ protected final void useActualTime() {
+ PowerMockito.when(TimeUtil.currentTimeMillis()).thenCallRealMethod();
+ }
+
+ protected final void setCurrentMillis(long cur) {
+ currentMillis = cur;
+ PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis);
+ }
+
+ protected final void sleep(int t) {
+ currentMillis += t;
+ PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis);
+ }
+
+ protected final void sleepSecond(int timeSec) {
+ sleep(timeSec * 1000);
+ }
+}
diff --git a/sentinel-core/pom.xml b/sentinel-core/pom.xml
index f241734f07..0d54c27d45 100755
--- a/sentinel-core/pom.xml
+++ b/sentinel-core/pom.xml
@@ -28,6 +28,11 @@
+
+ io.netty
+ netty-all
+ 4.1.31.Final
+ org.mockitomockito-core
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Entry.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Entry.java
index dd76377387..bbc8e94162 100755
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Entry.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Entry.java
@@ -15,12 +15,12 @@
*/
package com.alibaba.csp.sentinel;
-import com.alibaba.csp.sentinel.slots.block.BlockException;
-import com.alibaba.csp.sentinel.util.TimeUtil;
+import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
-import com.alibaba.csp.sentinel.context.Context;
+import com.alibaba.csp.sentinel.slots.block.BlockException;
+import com.alibaba.csp.sentinel.util.TimeUtil;
/**
* Each {@link SphU}#entry() will return an {@link Entry}. This class holds information of current invocation:
@@ -66,6 +66,7 @@ public abstract class Entry implements AutoCloseable {
private Throwable error;
private BlockException blockError;
+ private long tokenId = 0;
protected final ResourceWrapper resourceWrapper;
public Entry(ResourceWrapper resourceWrapper) {
@@ -73,6 +74,14 @@ public Entry(ResourceWrapper resourceWrapper) {
this.createTimestamp = TimeUtil.currentTimeMillis();
}
+ public long getTokenId() {
+ return tokenId;
+ }
+
+ public void setTokenId(long tokenId) {
+ this.tokenId = tokenId;
+ }
+
public ResourceWrapper getResourceWrapper() {
return resourceWrapper;
}
@@ -104,7 +113,7 @@ public void close() {
* Exit this entry. This method should invoke if and only if once at the end of the resource protection.
*
* @param count tokens to release.
- * @param args extra parameters
+ * @param args extra parameters
* @throws ErrorEntryFreeException, if {@link Context#getCurEntry()} is not this entry.
*/
public abstract void exit(int count, Object... args) throws ErrorEntryFreeException;
@@ -113,7 +122,7 @@ public void close() {
* Exit this entry.
*
* @param count tokens to release.
- * @param args extra parameters
+ * @param args extra parameters
* @return next available entry after exit, that is the parent entry.
* @throws ErrorEntryFreeException, if {@link Context#getCurEntry()} is not this entry.
*/
@@ -178,4 +187,17 @@ public void setOriginNode(Node originNode) {
this.originNode = originNode;
}
+ @Override
+ public String toString() {
+ return "Entry{" +
+ "createTimestamp=" + createTimestamp +
+ ", completeTimestamp=" + completeTimestamp +
+ ", curNode=" + curNode +
+ ", originNode=" + originNode +
+ ", error=" + error +
+ ", blockError=" + blockError +
+ ", tokenId=" + tokenId +
+ ", resourceWrapper=" + resourceWrapper +
+ '}';
+ }
}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResult.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResult.java
index 29fa064a2f..b16b76f0e8 100644
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResult.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResult.java
@@ -30,14 +30,25 @@ public class TokenResult {
private int remaining;
private int waitInMs;
+ private long tokenId;
+
private Map attachments;
- public TokenResult() {}
+ public TokenResult() {
+ }
public TokenResult(Integer status) {
this.status = status;
}
+ public long getTokenId() {
+ return tokenId;
+ }
+
+ public void setTokenId(long tokenId) {
+ this.tokenId = tokenId;
+ }
+
public Integer getStatus() {
return status;
}
@@ -77,10 +88,11 @@ public TokenResult setAttachments(Map attachments) {
@Override
public String toString() {
return "TokenResult{" +
- "status=" + status +
- ", remaining=" + remaining +
- ", waitInMs=" + waitInMs +
- ", attachments=" + attachments +
- '}';
+ "status=" + status +
+ ", remaining=" + remaining +
+ ", waitInMs=" + waitInMs +
+ ", attachments=" + attachments +
+ ", tokenId=" + tokenId +
+ '}';
}
}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResultStatus.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResultStatus.java
index ff5dddf5f1..19a58b5067 100644
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResultStatus.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResultStatus.java
@@ -60,5 +60,10 @@ public final class TokenResultStatus {
*/
public static final int NOT_AVAILABLE = 5;
- private TokenResultStatus() {}
+ public static final int RELEASE_OK = 6;
+
+ public static final int ALREADY_RELEASE=7;
+
+ private TokenResultStatus() {
+ }
}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenService.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenService.java
index f1d062921d..b0bdf489ff 100644
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenService.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenService.java
@@ -44,4 +44,9 @@ public interface TokenService {
* @return result of the token request
*/
TokenResult requestParamToken(Long ruleId, int acquireCount, Collection
+
+ com.alibaba.csp
+ sentinel-cluster-client-default
+ ${project.version}
+ com.alibaba.csp
diff --git a/sentinel-demo/sentinel-demo-cluster/sentinel-demo-cluster-server-alone/src/main/java/com/alibaba/csp/sentinel/demo/cluster/ClusterClientDemo.java b/sentinel-demo/sentinel-demo-cluster/sentinel-demo-cluster-server-alone/src/main/java/com/alibaba/csp/sentinel/demo/cluster/ClusterClientDemo.java
new file mode 100644
index 0000000000..1786fdbab1
--- /dev/null
+++ b/sentinel-demo/sentinel-demo-cluster/sentinel-demo-cluster-server-alone/src/main/java/com/alibaba/csp/sentinel/demo/cluster/ClusterClientDemo.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.demo.cluster;
+
+import com.alibaba.csp.sentinel.Entry;
+import com.alibaba.csp.sentinel.SphU;
+import com.alibaba.csp.sentinel.cluster.ClusterStateManager;
+import com.alibaba.csp.sentinel.cluster.server.ClusterTokenServer;
+import com.alibaba.csp.sentinel.cluster.server.SentinelDefaultTokenServer;
+import com.alibaba.csp.sentinel.slots.block.flow.timeout.ReSourceTimeoutStrategyUtil;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
Cluster server demo (alone mode).
+ *
Here we init the cluster server dynamic data sources in
+ * {@link com.alibaba.csp.sentinel.demo.cluster.init.DemoClusterServerInitFunc}.