Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
Signed-off-by: yunfeiyanggzq <yunfeiyang@buaa.edu.cn>
  • Loading branch information
yunfeiyanggzq committed Aug 11, 2020
1 parent 17c3ff7 commit 850a1de
Show file tree
Hide file tree
Showing 70 changed files with 2,820 additions and 574 deletions.
122 changes: 61 additions & 61 deletions sentinel-cluster/sentinel-cluster-client-default/pom.xml
Original file line number Diff line number Diff line change
@@ -1,62 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-cluster</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>1.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-cluster-client-default</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster-common-default</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-cluster</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>1.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-cluster-client-default</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster-common-default</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public final class ClientConstants {
public static final int TYPE_PING = 0;
public static final int TYPE_FLOW = 1;
public static final int TYPE_PARAM_FLOW = 2;
public static final int TYPE_CONCURRENT_FLOW_ACQUIRE = 3;
public static final int TYPE_CONCURRENT_FLOW_RELEASE = 4;

public static final int CLIENT_STATUS_OFF = 0;
public static final int CLIENT_STATUS_PENDING = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,26 @@
*/
package com.alibaba.csp.sentinel.cluster.client;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
import com.alibaba.csp.sentinel.cluster.ClusterTransportClient;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenServerDescriptor;
import com.alibaba.csp.sentinel.cluster.*;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientAssignConfig;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager;
import com.alibaba.csp.sentinel.cluster.client.config.ServerChangeObserver;
import com.alibaba.csp.sentinel.cluster.log.ClusterClientStatLogUtil;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowAcquireRequestData;
import com.alibaba.csp.sentinel.cluster.request.data.ConcurrentFlowReleaseRequestData;
import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData;
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.data.ConcurrentFlowAcquireResponseData;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.StringUtil;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Default implementation of {@link ClusterTokenClient}.
*
Expand Down Expand Up @@ -152,7 +151,7 @@ public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritiz
return badRequest();
}
FlowRequestData data = new FlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setPriority(prioritized);
.setFlowId(flowId).setPriority(prioritized);
ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
try {
TokenResult result = sendTokenRequest(request);
Expand All @@ -170,7 +169,7 @@ public TokenResult requestParamToken(Long flowId, int acquireCount, Collection<O
return badRequest();
}
ParamFlowRequestData data = new ParamFlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setParams(params);
.setFlowId(flowId).setParams(params);
ClusterRequest<ParamFlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_PARAM_FLOW, data);
try {
TokenResult result = sendTokenRequest(request);
Expand All @@ -182,6 +181,40 @@ public TokenResult requestParamToken(Long flowId, int acquireCount, Collection<O
}
}

@Override
public TokenResult requestConcurrentToken(String clientAddress, Long ruleId, int acquireCount) {
if (notValidRequest(ruleId, acquireCount)) {
return badRequest();
}
ConcurrentFlowAcquireRequestData data = new ConcurrentFlowAcquireRequestData().setFlowId(ruleId).setCount(acquireCount);
ClusterRequest<ConcurrentFlowAcquireRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_ACQUIRE, data);
try {
TokenResult result = sendTokenRequest(request);
logForResult(result);
return result;
} catch (Exception ex) {
ClusterClientStatLogUtil.log(ex.getMessage());
return new TokenResult(TokenResultStatus.FAIL);
}
}

@Override
public TokenResult releaseConcurrentToken(Long tokenId) {
if (tokenId == null) {
return badRequest();
}
ConcurrentFlowReleaseRequestData data = new ConcurrentFlowReleaseRequestData().setTokenId(tokenId);
ClusterRequest<ConcurrentFlowReleaseRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_RELEASE, data);
try {
TokenResult result = sendTokenRequest(request);
logForResult(result);
return result;
} catch (Exception ex) {
ClusterClientStatLogUtil.log(ex.getMessage());
return new TokenResult(TokenResultStatus.FAIL);
}
}

private void logForResult(TokenResult result) {
switch (result.getStatus()) {
case TokenResultStatus.NO_RULE_EXISTS:
Expand All @@ -197,15 +230,29 @@ private void logForResult(TokenResult result) {
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
if (transportClient == null) {
RecordLog.warn(
"[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
"[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
return clientFail();
}
ClusterResponse response = transportClient.sendRequest(request);
TokenResult result = new TokenResult(response.getStatus());
if (response.getData() != null) {
FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
result.setRemaining(responseData.getRemainingCount())
.setWaitInMs(responseData.getWaitInMs());
switch (request.getType()) {
case ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_RELEASE:
break;
case ClusterConstants.MSG_TYPE_CONCURRENT_FLOW_ACQUIRE:
ConcurrentFlowAcquireResponseData concurrentAcquireResponseData = (ConcurrentFlowAcquireResponseData) response.getData();
result.setTokenId(concurrentAcquireResponseData.getTokenId());
break;
case ClusterConstants.MSG_TYPE_PARAM_FLOW:
case ClusterConstants.MSG_TYPE_FLOW:
FlowTokenResponseData responseData = (FlowTokenResponseData) response.getData();
result.setRemaining(responseData.getRemainingCount())
.setWaitInMs(responseData.getWaitInMs());
break;
default:
RecordLog.warn(
"[DefaultClusterTokenClient] Unknown request type: {}", request.getType());
}
}
return result;
}
Expand Down
Loading

0 comments on commit 850a1de

Please sign in to comment.