Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]cluster concurrent flow control #1629

Closed
wants to merge 13 commits into from
121 changes: 60 additions & 61 deletions sentinel-cluster/sentinel-cluster-client-default/pom.xml
Original file line number Diff line number Diff line change
@@ -1,62 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-cluster</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>1.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-cluster-client-default</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster-common-default</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-cluster</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>1.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-cluster-client-default</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster-common-default</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public final class ClientConstants {
public static final int TYPE_PING = 0;
public static final int TYPE_FLOW = 1;
public static final int TYPE_PARAM_FLOW = 2;
public static final int TYPE_CONCURRENT_FLOW_ACQUIRE = 3;
public static final int TYPE_CONCURRENT_FLOW_RELEASE = 4;

public static final int CLIENT_STATUS_OFF = 0;
public static final int CLIENT_STATUS_PENDING = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,25 @@
*/
package com.alibaba.csp.sentinel.cluster.client;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
import com.alibaba.csp.sentinel.cluster.ClusterTransportClient;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenServerDescriptor;
import com.alibaba.csp.sentinel.cluster.*;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientAssignConfig;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager;
import com.alibaba.csp.sentinel.cluster.client.config.ServerChangeObserver;
import com.alibaba.csp.sentinel.cluster.log.ClusterClientStatLogUtil;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowAcquireRequestData;
import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowReleaseRequestData;
import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData;
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.data.ConcurrentFlowAcquireResponseData;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.StringUtil;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Default implementation of {@link ClusterTokenClient}.
*
Expand Down Expand Up @@ -152,7 +150,7 @@ public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritiz
return badRequest();
}
FlowRequestData data = new FlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setPriority(prioritized);
.setFlowId(flowId).setPriority(prioritized);
ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
try {
TokenResult result = sendTokenRequest(request);
Expand All @@ -170,7 +168,7 @@ public TokenResult requestParamToken(Long flowId, int acquireCount, Collection<O
return badRequest();
}
ParamFlowRequestData data = new ParamFlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setParams(params);
.setFlowId(flowId).setParams(params);
ClusterRequest<ParamFlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_PARAM_FLOW, data);
try {
TokenResult result = sendTokenRequest(request);
Expand All @@ -182,6 +180,37 @@ public TokenResult requestParamToken(Long flowId, int acquireCount, Collection<O
}
}

@Override
public TokenResult requestConcurrentToken(String clientAddress, Long ruleId, int acquireCount, boolean prioritized) {
if (notValidRequest(ruleId, acquireCount)) {
return badRequest();
}
ConcurrentFlowAcquireRequestData data = new ConcurrentFlowAcquireRequestData().setFlowId(ruleId).setCount(acquireCount).setPrioritized(prioritized);
ClusterRequest<ConcurrentFlowAcquireRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_ACQUIRE, data);
try {
TokenResult result = sendTokenRequest(request);
logForResult(result);
return result;
} catch (Exception ex) {
ClusterClientStatLogUtil.log(ex.getMessage());
return new TokenResult(TokenResultStatus.FAIL);
}
}

@Override
public void releaseConcurrentToken(Long tokenId) {
if (tokenId == null) {
return;
}
ConcurrentFlowReleaseRequestData data = new ConcurrentFlowReleaseRequestData().setTokenId(tokenId);
ClusterRequest<ConcurrentFlowReleaseRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_RELEASE, data);
try {
sendRequestIgnoreResponse(request);
} catch (Exception ex) {
ClusterClientStatLogUtil.log(ex.getMessage());
}
}

private void logForResult(TokenResult result) {
switch (result.getStatus()) {
case TokenResultStatus.NO_RULE_EXISTS:
Expand All @@ -197,19 +226,57 @@ private void logForResult(TokenResult result) {
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
if (transportClient == null) {
RecordLog.warn(
"[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
"[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
return clientFail();
}
ClusterResponse response = transportClient.sendRequest(request);
TokenResult result = new TokenResult(response.getStatus());
if (response.getData() != null) {
FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
result.setRemaining(responseData.getRemainingCount())
.setWaitInMs(responseData.getWaitInMs());
switch (request.getType()) {
case ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_RELEASE:
break;
case ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_ACQUIRE:
ConcurrentFlowAcquireResponseData concurrentAcquireResponseData = (ConcurrentFlowAcquireResponseData) response.getData();
result.setTokenId(concurrentAcquireResponseData.getTokenId());
break;
case ClusterConstants.MSG_TYPE_PARAM_FLOW:
case ClusterConstants.MSG_TYPE_FLOW:
FlowTokenResponseData responseData = (FlowTokenResponseData) response.getData();
result.setRemaining(responseData.getRemainingCount())
.setWaitInMs(responseData.getWaitInMs());
break;
default:
RecordLog.warn(
"[DefaultClusterTokenClient] Unknown request type: {}", request.getType());
}
}
return result;
}

private void sendRequestIgnoreResponse(ClusterRequest request) throws Exception {
if (transportClient == null) {
RecordLog.warn(
"[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
return;
}
transportClient.sendRequestIgnoreResponse(request);
}

// private TokenResult sendTokenRequestTimeout(ClusterRequest request, long timeout) throws Exception {
// if (transportClient == null) {
// RecordLog.warn(
// "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
// return clientFail();
// }
// ClusterResponse response = transportClient.sendRequest(request, timeout);
// TokenResult result = new TokenResult(response.getStatus());
// if (response.getData() != null) {
// ConcurrentFlowAcquireResponseData concurrentAcquireResponseData = (ConcurrentFlowAcquireResponseData) response.getData();
// result.setTokenId(concurrentAcquireResponseData.getTokenId());
// }
// return result;
// }

private boolean notValidRequest(Long id, int count) {
return id == null || id <= 0 || count <= 0;
}
Expand Down
Loading