Skip to content

Commit

Permalink
代码优化
Browse files Browse the repository at this point in the history
  • Loading branch information
naivetoby committed Jan 8, 2024
1 parent 1b2b83b commit 8835c06
Show file tree
Hide file tree
Showing 15 changed files with 337 additions and 279 deletions.
176 changes: 136 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

<groupId>com.demo</groupId>
<artifactId>demo</artifactId>
<version>2.0.9</version>
<version>3.0.0</version>

<dependencies>
<dependency>
Expand All @@ -27,42 +27,100 @@
<dependency>
<groupId>vip.toby.rpc</groupId>
<artifactId>simple-rpc</artifactId>
<version>2.0.9</version>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
```

## RpcDTO
```java
@Data
@RpcDTO
public class PlusDTO {

@NotNull
@Min(1)
private Integer x;

private int y;

}

@Data
@RpcDTO
public class DelayPlusDTO extends RpcDelayDTO {

@NotNull
private Long createTime;

@NotNull
@Min(1)
private Integer x;

private int y;

}
```

## RpcClientConfig
```java
@Component
public class RpcClientConfig implements RpcClientConfigurer {

@Override
public void addRpcClientRegistry(RpcClientRegistry rpcClientRegistry) {
// 如果 @RpcClient 不在当前项目,可以手动配置
rpcClientRegistry.addRegistration(OtherSyncClient.class);
}

}
```

## RpcServer Demo
```java
@RpcServer(value="rpc-queue-name", type = {RpcType.SYNC, RpcType.ASYNC}, xMessageTTL = 1000, threadNum = 1)
@RpcServer(value = "rpc-queue-name", type = {RpcType.SYNC, RpcType.ASYNC}, xMessageTTL = 1000, threadNum = 1)
public class Server {

@RpcServerMethod
public ServerResult methodName1(JSONObject params) {
String param1 = params.getString("param1");
int param2 = params.getIntValue("param2");
JSONObject result = new JSONObject();
result.put("param1", param1);
result.put("param2", param2);
result.put("result", param1 + param2);
return ServerResult.buildSuccessResult(result).message("ok");
public R methodName1(@Validated PlusDTO plusDTO) {
final int x = plusDTO.getX();
final int y = plusDTO.getY();
return R.okResult(x + y);
}

@RpcServerMethod("methodName2-alias")
public ServerResult methodName2(JSONObject params) {
return ServerResult.buildFailureMessage("失败").errorCode(233);
public R methodName2(@Validated PlusDTO plusDTO) {
return R.failMessage("计算失败").errorCode(-9999);
}

}

@RpcServer(value = "rpc-queue-name-other", type = RpcType.SYNC)
public class OtherServer {

@RpcServerMethod
public ServerResult methodName3(@Validated PlusDTO plusDTO) {
int x = plusDTO.getX();
int y = plusDTO.getY();
JSONObject result = new JSONObject();
result.put("x", x);
result.put("y", y);
result.put("result", x + y);
return ServerResult.buildSuccessResult(result).message("ok");
public R methodName3(PlusDTO plusDTO) {
final int x = plusDTO.getX();
final int y = plusDTO.getY();
return R.okResult(x + y).message("计算成功, x: {}, y: {}", x, y);
}

}

@RpcServer(value = "delay-plus", type = RpcType.DELAY)
@Slf4j
public class DelayPlusServer {

@RpcServerMethod
public R delayPlus(@Validated DelayPlusDTO delayPlusDTO) {
final int x = delayPlusDTO.getX();
final int y = delayPlusDTO.getY();
final int delay = delayPlusDTO.getDelay();
final long createTime = delayPlusDTO.getCreateTime();
final long now = System.currentTimeMillis();
log.info("delayPlusDTO: {}, result: {}, delay: {}, duration: {}", delayPlusDTO, x + y, delay, now - createTime);
return R.ok();
}

}
Expand All @@ -74,39 +132,53 @@ public class Server {
public interface SyncClient {

@RpcClientMethod
RpcResult methodName1(String param1, int param2);
RpcResult methodName1(PlusDTO plusDTO);

@RpcClientMethod("methodName2-alias")
RpcResult methodName2(String param1, int param2);

@RpcClientMethod
RpcResult methodName3(PlusDTO plusDTO, JSONObject data, int x, int y);
RpcResult methodName2(PlusDTO plusDTO);

}

@RpcClient(value = "rpc-queue-name", type = RpcType.ASYNC)
public interface AsyncClient {

@RpcClientMethod
void methodName1(String param1, int param2);
void methodName1(PlusDTO plusDTO);

@RpcClientMethod("methodName2-alias")
void methodName2(String param1, int param2);
void methodName2(PlusDTO plusDTO);

}

@RpcClient(value = "rpc-queue-name-other", type = RpcType.SYNC)
public interface OtherSyncClient {

@RpcClientMethod
RpcResult methodName3(PlusDTO plusDTO);

}

@RpcClient(value = "delay-plus", type = RpcType.DELAY)
public interface DelayClient {

@RpcClientMethod
void delayPlus(DelayPlusDTO delayPlusDTO);

}
```

## Application Demo
```java

@EnableSimpleRpc
@SpringBootApplication
@RequiredArgsConstructor
@Slf4j
public class Application {

private final SyncClient syncClient;
private final AsyncClient asyncClient;
private final SyncClient syncClient;
private final OtherSyncClient otherSyncClient;
private final DelayClient delayClient;

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
Expand All @@ -115,24 +187,48 @@ public class Application {
@PostConstruct
public void test() {
new Thread(() -> {

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
syncClient.methodName1("param1", 2);
syncClient.methodName1("param1", 2);
syncClient.methodName1("dew", 46);

// 同步调用 1
final PlusDTO plusDTO = new PlusDTO();
plusDTO.setX(1);
plusDTO.setY(1);
final JSONObject data = new JSONObject();
data.put("x", 2);
data.put("y", 2);
final RpcResult rpcResult = syncClient.methodName3(plusDTO, data, 3, 3);
log.info("result: {}", rpcResult.getServerResult().getResult());
syncClient.methodName1("yyy", 2121);
asyncClient.methodName2("sss", 27);
RpcResult rpcResult = syncClient.methodName1(plusDTO);
log.info("syncClient.methodName1, RStatusOk: {}, RResult: {}", rpcResult.isRStatusOk(), rpcResult.getRResult());

// 同步调用 1-1
plusDTO.setX(0);
rpcResult = syncClient.methodName1(plusDTO);
log.info("syncClient.methodName1, RStatusOk: {}, ErrorMessage: {}, ErrorCode: {}", rpcResult.isRStatusOk(), rpcResult.getResult()
.getMessage(), rpcResult.getResult().getErrorCode());

// 同步调用 2
plusDTO.setX(2);
rpcResult = syncClient.methodName2(plusDTO);
log.info("syncClient.methodName2, RStatusOk: {}, ErrorMessage: {}, ErrorCode: {}", rpcResult.isRStatusOk(), rpcResult.getResult()
.getMessage(), rpcResult.getResult().getErrorCode());

// 异步调用
asyncClient.methodName2(plusDTO);

// 同步调用 3
rpcResult = otherSyncClient.methodName3(plusDTO);
log.info("otherSyncClient.methodName3, RStatusOk: {}, RResult: {}", rpcResult.isRStatusOk(), rpcResult.getRResult());

// 延迟调用, 注意⚠️ RabbitMQ 需要启用插件 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
final DelayPlusDTO delayPlusDTO = new DelayPlusDTO();
delayPlusDTO.setCreateTime(System.currentTimeMillis());
// 延迟 3 秒后调用
delayPlusDTO.setDelay(3000);
delayPlusDTO.setX(5);
delayPlusDTO.setY(8);
delayClient.delayPlus(delayPlusDTO);

}).start();
}

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>vip.toby.rpc</groupId>
<artifactId>simple-rpc</artifactId>
<version>2.0.9</version>
<version>3.0.0</version>
<packaging>jar</packaging>

<name>simple-rpc</name>
Expand Down Expand Up @@ -73,7 +73,7 @@
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.44</version>
<version>2.0.45</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/vip/toby/rpc/client/RpcClientProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,24 +138,23 @@ public Object invoke(Object proxy, Method method, Object[] args) {
if (resultObj == null) {
// 无返回任何结果,说明服务器负载过高,没有及时处理请求,导致超时
log.error("Service Unavailable! Duration: {}ms, {}-RpcClient-{}, Method: {}, Param: {}", System.currentTimeMillis() - start, this.rpcType.getName(), this.rpcName, methodName, paramData);
return RpcResult.buildUnavailable();
return RpcResult.build(RpcStatus.UNAVAILABLE);
}
// 获取调用结果的状态
final JSONObject resultJson = JSONB.parseObject(resultObj.getBody());
final int status = resultJson.getIntValue("status");
final Object resultData = resultJson.get("data");
final ServerStatus serverStatus = ServerStatus.getServerStatus(status);
if (serverStatus != ServerStatus.SUCCESS || resultData == null) {
log.error("{}! Duration: {}ms, {}-RpcClient-{}, Method: {}, Param: {}", serverStatus.getMessage(), System.currentTimeMillis() - start, this.rpcType.getName(), this.rpcName, methodName, paramData);
return RpcResult.build(serverStatus);
final RpcStatus rpcStatus = RpcStatus.of(status);
if (rpcStatus != RpcStatus.OK || resultData == null) {
log.error("{}! Duration: {}ms, {}-RpcClient-{}, Method: {}, Param: {}", rpcStatus.getMessage(), System.currentTimeMillis() - start, this.rpcType.getName(), this.rpcName, methodName, paramData);
return RpcResult.build(rpcStatus);
}
// 获取操作层的状态
final JSONObject serverResultJson = JSON.parseObject(resultData.toString());
final RpcResult rpcResult = RpcResult.buildSuccess()
.result(ServerResult.build(OperateStatus.getOperateStatus(serverResultJson.getIntValue("status")))
.message(serverResultJson.getString("message"))
.result(serverResultJson.get("result"))
.errorCode(serverResultJson.getIntValue("errorCode")));
final RpcResult rpcResult = RpcResult.okResult(R.build(RStatus.of(serverResultJson.getIntValue("status")))
.message(serverResultJson.getString("message"))
.result(serverResultJson.get("result"))
.errorCode(serverResultJson.getIntValue("errorCode")));
final long offset = System.currentTimeMillis() - start;
if (offset > Math.floor(this.rpcProperties.getClientSlowCallTimePercent() * this.replyTimeout)) {
log.warn("Call Slowing! Duration: {}ms, {}-RpcClient-{}, Method: {}, Param: {}, RpcResult: {}", offset, this.rpcType.getName(), this.rpcName, methodName, paramData, rpcResult);
Expand Down
43 changes: 0 additions & 43 deletions src/main/java/vip/toby/rpc/entity/ErrorCode.java

This file was deleted.

36 changes: 0 additions & 36 deletions src/main/java/vip/toby/rpc/entity/OperateStatus.java

This file was deleted.

Loading

0 comments on commit 8835c06

Please sign in to comment.