Skip to content

Commit

Permalink
[ISSUE #3905] Support bname in protocol for 5.0 client
Browse files Browse the repository at this point in the history
* add bname for `CheckTransactionStateRequestHeader`, `ConsumerSendMsgBackRequestHeader`, `EndTransactionRequestHeader`,
`SendMessageRequestHeader`
  • Loading branch information
drpmma committed Oct 17, 2022
1 parent ff60d5c commit 938d59e
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void sendCheckMessage(MessageExt msgExt) throws Exception {
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,7 @@ public boolean registerClient(final String addr, final HeartbeatData heartbeat,

public void consumerSendMessageBack(
final String addr,
final String brokerName,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
Expand All @@ -1466,6 +1467,7 @@ public void consumerSendMessageBack(
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
requestHeader.setBname(brokerName);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerN
consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
}

this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
this.defaultMQPullConsumer.getMaxReconsumeTimes());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg, consumerGroup,
delayLevel, 3000, this.defaultMQPullConsumer.getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ private void sendMessageBack(MessageExt msg, int delayLevel, final String broker
} else {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ private void processTransactionState(
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
thisHeader.setBname(checkRequestHeader.getBname());

String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
Expand Down Expand Up @@ -835,6 +836,7 @@ private SendResult sendKernelImpl(final Message msg,
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
requestHeader.setBname(brokerName);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
Expand Down Expand Up @@ -1365,6 +1367,7 @@ public void endTransaction(
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
requestHeader.setBname(destBrokerName);
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
package org.apache.rocketmq.common.protocol.header;

import com.google.common.base.MoreObjects;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
public class CheckTransactionStateRequestHeader extends RpcRequestHeader {
@CFNotNull
private Long tranStateTableOffset;
@CFNotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package org.apache.rocketmq.common.protocol.header;

import com.google.common.base.MoreObjects;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
public class ConsumerSendMsgBackRequestHeader extends RpcRequestHeader {
@CFNotNull
private Long offset;
@CFNotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.rocketmq.common.protocol.header;

import com.google.common.base.MoreObjects;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class EndTransactionRequestHeader implements CommandCustomHeader {
public class EndTransactionRequestHeader extends RpcRequestHeader {
@CFNotNull
private String producerGroup;
@CFNotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode

@CFNullable
private boolean m; //batch
@CFNullable
private String n; // brokerName

public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
Expand All @@ -75,6 +77,7 @@ public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final Se
v1.setUnitMode(v2.k);
v1.setMaxReconsumeTimes(v2.l);
v1.setBatch(v2.m);
v1.setBname(v2.n);
return v1;
}

Expand All @@ -93,6 +96,7 @@ public static SendMessageRequestHeaderV2 createSendMessageRequestHeaderV2(final
v2.k = v1.isUnitMode();
v2.l = v1.getMaxReconsumeTimes();
v2.m = v1.isBatch();
v2.n = v1.getBname();
return v2;
}

Expand All @@ -115,6 +119,7 @@ public void encode(ByteBuf out) {
writeIfNotNull(out, "k", k);
writeIfNotNull(out, "l", l);
writeIfNotNull(out, "m", m);
writeIfNotNull(out, "n", n);
}

@Override
Expand Down Expand Up @@ -184,6 +189,11 @@ public void decode(HashMap<String, String> fields) throws RemotingCommandExcepti
if (str != null) {
m = Boolean.parseBoolean(str);
}

str = fields.get("n");
if (str != null) {
n = str;
}
}

public String getA() {
Expand Down Expand Up @@ -306,6 +316,7 @@ public String toString() {
.add("k", k)
.add("l", l)
.add("m", m)
.add("n", n)
.toString();
}
}

0 comments on commit 938d59e

Please sign in to comment.