diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java index beda6504c6a..6ed015b9933 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java @@ -56,6 +56,7 @@ public void sendCheckMessage(MessageExt msgExt) throws Exception { checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); + checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName()); msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index b327ee28b5b..5f393cb5726 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1451,6 +1451,7 @@ public boolean registerClient(final String addr, final HeartbeatData heartbeat, public void consumerSendMessageBack( final String addr, + final String brokerName, final MessageExt msg, final String consumerGroup, final int delayLevel, @@ -1466,6 +1467,7 @@ public void consumerSendMessageBack( requestHeader.setDelayLevel(delayLevel); requestHeader.setOriginMsgId(msg.getMsgId()); requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); + requestHeader.setBname(brokerName); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 66f3578fe08..96f31724e2c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -622,8 +622,8 @@ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerN consumerGroup = this.defaultMQPullConsumer.getConsumerGroup(); } - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000, - this.defaultMQPullConsumer.getMaxReconsumeTimes()); + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg, consumerGroup, + delayLevel, 3000, this.defaultMQPullConsumer.getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 7dc212dd14c..e5121dc73d1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -732,7 +732,7 @@ private void sendMessageBack(MessageExt msg, int delayLevel, final String broker } else { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } } catch (Exception e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index d9726631907..b40f536fdfc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -375,6 +375,7 @@ private void processTransactionState( thisHeader.setProducerGroup(producerGroup); thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); thisHeader.setFromTransactionCheck(true); + thisHeader.setBname(checkRequestHeader.getBname()); String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (uniqueKey == null) { @@ -835,6 +836,7 @@ private SendResult sendKernelImpl(final Message msg, requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); + requestHeader.setBname(brokerName); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { @@ -1365,6 +1367,7 @@ public void endTransaction( EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); + requestHeader.setBname(destBrokerName); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java index d62802c06ab..6ef4099b0ae 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java @@ -21,11 +21,11 @@ package org.apache.rocketmq.common.protocol.header; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class CheckTransactionStateRequestHeader implements CommandCustomHeader { +public class CheckTransactionStateRequestHeader extends RpcRequestHeader { @CFNotNull private Long tranStateTableOffset; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java index 3d65f23921a..ee0416f52a8 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java @@ -18,12 +18,12 @@ package org.apache.rocketmq.common.protocol.header; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader { +public class ConsumerSendMsgBackRequestHeader extends RpcRequestHeader { @CFNotNull private Long offset; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java index 80fdc3d4a64..eabc4bed6ec 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java @@ -18,13 +18,13 @@ package org.apache.rocketmq.common.protocol.header; import com.google.common.base.MoreObjects; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class EndTransactionRequestHeader implements CommandCustomHeader { +public class EndTransactionRequestHeader extends RpcRequestHeader { @CFNotNull private String producerGroup; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java index f4771252eb3..1985f65f49a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java @@ -59,6 +59,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode @CFNullable private boolean m; //batch + @CFNullable + private String n; // brokerName public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) { SendMessageRequestHeader v1 = new SendMessageRequestHeader(); @@ -75,6 +77,7 @@ public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final Se v1.setUnitMode(v2.k); v1.setMaxReconsumeTimes(v2.l); v1.setBatch(v2.m); + v1.setBname(v2.n); return v1; } @@ -93,6 +96,7 @@ public static SendMessageRequestHeaderV2 createSendMessageRequestHeaderV2(final v2.k = v1.isUnitMode(); v2.l = v1.getMaxReconsumeTimes(); v2.m = v1.isBatch(); + v2.n = v1.getBname(); return v2; } @@ -115,6 +119,7 @@ public void encode(ByteBuf out) { writeIfNotNull(out, "k", k); writeIfNotNull(out, "l", l); writeIfNotNull(out, "m", m); + writeIfNotNull(out, "n", n); } @Override @@ -184,6 +189,11 @@ public void decode(HashMap fields) throws RemotingCommandExcepti if (str != null) { m = Boolean.parseBoolean(str); } + + str = fields.get("n"); + if (str != null) { + n = str; + } } public String getA() { @@ -306,6 +316,7 @@ public String toString() { .add("k", k) .add("l", l) .add("m", m) + .add("n", n) .toString(); } } \ No newline at end of file