Skip to content

Commit

Permalink
optimize : Eliminate RpcMessage and Encoder/Decoder dependencies (#6209)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue authored Jun 24, 2024
1 parent b51187e commit eb19e98
Show file tree
Hide file tree
Showing 12 changed files with 467 additions and 18 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Add changes here for all PR submitted to the 2.x branch.
### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : load SeataSerializer by version
- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] Eliminate RpcMessage and Encoder/Decoder dependencies

### refactor:
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: send async response
Expand Down
2 changes: 1 addition & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化

- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] 解开 RpcMessage 和 Encoder/Decoder 的互相依赖

### refactor:
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,13 @@ class ClientHandler extends ChannelDuplexHandler {

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
RpcMessage rpcMessage = null;
if (msg instanceof ProtocolRpcMessage) {
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
processMessage(ctx, rpcMessage);
} else {
LOGGER.error("rpcMessage type error");
}
processMessage(ctx, (RpcMessage) msg);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,13 @@ class ServerHandler extends ChannelDuplexHandler {
*/
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
RpcMessage rpcMessage = null;
if (msg instanceof ProtocolRpcMessage) {
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
processMessage(ctx, rpcMessage);
} else {
LOGGER.error("rpcMessage type error");
}
processMessage(ctx, (RpcMessage) msg);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty;

import org.apache.seata.core.protocol.RpcMessage;

/**
* The protocol RPC message.
*/
public interface ProtocolRpcMessage {

/**
* The protocol message to rpc message.
* @return
*/
RpcMessage protocolMsg2RpcMsg();

/**
* The rpc message to protocol message.
* @param rpcMessage
*/
void rpcMsg2ProtocolMsg(RpcMessage rpcMessage);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty.v0;

import org.apache.seata.core.compressor.CompressorType;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
import org.apache.seata.core.serializer.SerializerType;

import java.util.concurrent.atomic.AtomicLong;

/**
* the protocol v0 rpc message
**/
public class ProtocolRpcMessageV0 implements ProtocolRpcMessage {

private static AtomicLong NEXT_ID = new AtomicLong(0);

/**
* Gets next message id.
*
* @return the next message id
*/
public static long getNextMessageId() {
return NEXT_ID.incrementAndGet();
}

private long id;
private boolean isAsync;
private boolean isRequest;
private boolean isHeartbeat;
private Object body;
private byte messageType;
private boolean isSeataCodec;

/**
* Gets id.
*
* @return the id
*/
public long getId() {
return id;
}

/**
* Sets id.
*
* @param id the id
*/
public void setId(long id) {
this.id = id;
}

/**
* Is async boolean.
*
* @return the boolean
*/
public boolean isAsync() {
return isAsync;
}

/**
* Sets async.
*
* @param async the async
*/
public void setAsync(boolean async) {
isAsync = async;
}

/**
* Is request boolean.
*
* @return the boolean
*/
public boolean isRequest() {
return isRequest;
}

/**
* Sets request.
*
* @param request the request
*/
public void setRequest(boolean request) {
isRequest = request;
}

/**
* Is heartbeat boolean.
*
* @return the boolean
*/
public boolean isHeartbeat() {
return isHeartbeat;
}

/**
* Sets heartbeat.
*
* @param heartbeat the heartbeat
*/
public void setHeartbeat(boolean heartbeat) {
isHeartbeat = heartbeat;
}

/**
* Gets body.
*
* @return the body
*/
public Object getBody() {
return body;
}

/**
* Sets body.
*
* @param body the body
*/
public void setBody(Object body) {
this.body = body;
}

public boolean isSeataCodec() {
return isSeataCodec;
}

public void setSeataCodec(boolean seataCodec) {
isSeataCodec = seataCodec;
}

public byte getMessageType() {
return messageType;
}

public void setMessageType(byte messageType) {
this.messageType = messageType;
}

@Override
public RpcMessage protocolMsg2RpcMsg() {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setMessageType(this.messageType);
rpcMessage.setCompressor(CompressorType.NONE.getCode());

byte codecType = this.isSeataCodec ? SerializerType.SEATA.getCode() : SerializerType.HESSIAN.getCode();
rpcMessage.setCodec(codecType);

if (this.isHeartbeat) {
if (this.isRequest) {
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST);
} else {
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE);
}
} else {
if (this.isRequest) {
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
} else {
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESPONSE);
}
}
rpcMessage.setBody(this.body);
rpcMessage.setId((int) this.id);
return rpcMessage;
}

@Override
public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) {
this.body = rpcMessage.getBody();
this.id = rpcMessage.getId();
this.isRequest = isRequest(rpcMessage.getMessageType());
this.isHeartbeat = isHeartbeat(rpcMessage.getMessageType());
this.isSeataCodec = rpcMessage.getCodec() == SerializerType.SEATA.getCode();
this.messageType = rpcMessage.getMessageType();
}

private boolean isHeartbeat(byte msgType) {
return msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
|| msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE;
}

private boolean isRequest(byte msgType) {
return msgType == ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY
|| msgType == ProtocolConstants.MSGTYPE_RESQUEST_SYNC;
}
}
Loading

0 comments on commit eb19e98

Please sign in to comment.