From 2371a2923ffffe4059f0b3e570e4712e75396dd0 Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan Date: Mon, 26 Sep 2022 19:20:22 +0800 Subject: [PATCH] [ISSUE #3905] Support bname in protocol for 4.9.x client (#5161) As this is a backported patch. For the purpose of compatibility, let's merge it as it is. --- .../processor/ReplyMessageProcessor.java | 1 + .../store/RemoteBrokerOffsetStore.java | 2 ++ .../rocketmq/client/impl/MQAdminImpl.java | 9 +++--- .../rocketmq/client/impl/MQClientAPIImpl.java | 31 ++++++++++-------- .../consumer/DefaultMQPullConsumerImpl.java | 4 +-- .../consumer/DefaultMQPushConsumerImpl.java | 2 +- .../client/impl/consumer/PullAPIWrapper.java | 1 + .../impl/producer/DefaultMQProducerImpl.java | 5 +++ .../CheckTransactionStateRequestHeader.java | 13 ++++++-- .../ConsumerSendMsgBackRequestHeader.java | 4 +-- .../header/EndTransactionRequestHeader.java | 16 ++++++++-- .../GetEarliestMsgStoretimeRequestHeader.java | 4 +-- .../header/GetMaxOffsetRequestHeader.java | 4 +-- .../header/GetMinOffsetRequestHeader.java | 4 +-- .../header/PullMessageRequestHeader.java | 15 ++++++--- .../QueryConsumerOffsetRequestHeader.java | 4 +-- .../header/ReplyMessageRequestHeader.java | 4 +-- .../header/SearchOffsetRequestHeader.java | 4 +-- .../header/SendMessageRequestHeader.java | 4 +-- .../header/SendMessageRequestHeaderV2.java | 20 ++++++++---- .../UpdateConsumerOffsetRequestHeader.java | 4 +-- .../rocketmq/common/rpc/RpcRequestHeader.java | 32 +++++++++++++++++++ .../tools/admin/DefaultMQAdminExtImpl.java | 8 ++--- 23 files changed, 137 insertions(+), 58 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java index 133165b9f7d..42b2edb6c68 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java @@ -173,6 +173,7 @@ private PushReplyResult pushReplyMessage(final ChannelHandlerContext ctx, replyMessageRequestHeader.setProperties(requestHeader.getProperties()); replyMessageRequestHeader.setReconsumeTimes(requestHeader.getReconsumeTimes()); replyMessageRequestHeader.setUnitMode(requestHeader.isUnitMode()); + replyMessageRequestHeader.setBname(requestHeader.getBname()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, replyMessageRequestHeader); request.setBody(msg.getBody()); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 409ceab9544..f2cb13a8c43 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -211,6 +211,7 @@ public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean is requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); + requestHeader.setBname(mq.getBrokerName()); if (isOneway) { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( @@ -238,6 +239,7 @@ private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingExcept requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setBname(mq.getBrokerName()); return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index ba4eafae97e..49ffef9c0ab 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -191,7 +191,7 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti if (brokerAddr != null) { try { - return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp, + return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); @@ -210,7 +210,7 @@ public long maxOffset(MessageQueue mq) throws MQClientException { if (brokerAddr != null) { try { - return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis); + return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } @@ -228,7 +228,7 @@ public long minOffset(MessageQueue mq) throws MQClientException { if (brokerAddr != null) { try { - return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis); + return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } @@ -246,8 +246,7 @@ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { if (brokerAddr != null) { try { - return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(), - timeoutMillis); + return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index bec1a568456..b2b8b9bd58a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -836,13 +836,14 @@ public MessageExt viewMessage(final String addr, final long phyoffset, final lon throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } - public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, + public long searchOffset(final String addr, final MessageQueue mq, final long timestamp, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setQueueId(mq.getQueueId()); requestHeader.setTimestamp(timestamp); + requestHeader.setBname(mq.getBrokerName()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -861,11 +862,12 @@ public long searchOffset(final String addr, final String topic, final int queueI throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } - public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) + public long getMaxOffset(final String addr, final MessageQueue mq, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setBname(mq.getBrokerName()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -912,11 +914,12 @@ public List getConsumerIdListByGroup( throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } - public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) + public long getMinOffset(final String addr, final MessageQueue mq, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setBname(mq.getBrokerName()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -936,12 +939,12 @@ public long getMinOffset(final String addr, final String topic, final int queueI throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } - public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, - final long timeoutMillis) + public long getEarliestMsgStoretime(final String addr, final MessageQueue mq, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setBname(mq.getBrokerName()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -1100,6 +1103,7 @@ public boolean registerClient(final String addr, final HeartbeatData heartbeat, public void consumerSendMessageBack( final String addr, + final String brokerName, final MessageExt msg, final String consumerGroup, final int delayLevel, @@ -1115,6 +1119,7 @@ public void consumerSendMessageBack( requestHeader.setDelayLevel(delayLevel); requestHeader.setOriginMsgId(msg.getMsgId()); requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); + requestHeader.setBname(brokerName); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 7319fdad740..dfcc23e3a58 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -583,8 +583,8 @@ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerN consumerGroup = this.defaultMQPullConsumer.getConsumerGroup(); } - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000, - this.defaultMQPullConsumer.getMaxReconsumeTimes()); + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg, + consumerGroup, delayLevel, 3000, this.defaultMQPullConsumer.getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 05ced26c479..df68284d49d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -515,7 +515,7 @@ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerN try { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index cc42a9e830e..33021bf6b2e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -191,6 +191,7 @@ public PullResult pullKernelImpl( requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); + requestHeader.setBname(mq.getBrokerName()); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 668f9b6b690..deb49e755a7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -350,6 +350,8 @@ private void processTransactionState( thisHeader.setProducerGroup(producerGroup); thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); thisHeader.setFromTransactionCheck(true); + thisHeader.setBname(checkRequestHeader.getBname()); + thisHeader.setQueueId(checkRequestHeader.getQueueId()); String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (uniqueKey == null) { @@ -774,6 +776,7 @@ private SendResult sendKernelImpl(final Message msg, requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); + requestHeader.setBname(mq.getBrokerName()); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { @@ -1323,6 +1326,8 @@ public void endTransaction( EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); + requestHeader.setBname(sendResult.getMessageQueue().getBrokerName()); + requestHeader.setQueueId(sendResult.getMessageQueue().getQueueId()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java index 6cba71c7e91..8c4b87c4a0b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class CheckTransactionStateRequestHeader implements CommandCustomHeader { +public class CheckTransactionStateRequestHeader extends RpcRequestHeader { @CFNotNull private Long tranStateTableOffset; @CFNotNull @@ -32,6 +32,7 @@ public class CheckTransactionStateRequestHeader implements CommandCustomHeader { private String msgId; private String transactionId; private String offsetMsgId; + private int queueId; @Override public void checkFields() throws RemotingCommandException { @@ -76,4 +77,12 @@ public String getOffsetMsgId() { public void setOffsetMsgId(String offsetMsgId) { this.offsetMsgId = offsetMsgId; } + + public int getQueueId() { + return queueId; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java index bd8fbb44ca0..f5dda1ba497 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java @@ -17,12 +17,12 @@ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader { +public class ConsumerSendMsgBackRequestHeader extends RpcRequestHeader { @CFNotNull private Long offset; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java index 87661c320ad..42a09e8e523 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java @@ -17,13 +17,13 @@ package org.apache.rocketmq.common.protocol.header; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class EndTransactionRequestHeader implements CommandCustomHeader { +public class EndTransactionRequestHeader extends RpcRequestHeader { @CFNotNull private String producerGroup; @CFNotNull @@ -43,6 +43,8 @@ public class EndTransactionRequestHeader implements CommandCustomHeader { private String transactionId; + private int queueId; + @Override public void checkFields() throws RemotingCommandException { if (MessageSysFlag.TRANSACTION_NOT_TYPE == this.commitOrRollback) { @@ -126,6 +128,16 @@ public String toString() { ", fromTransactionCheck=" + fromTransactionCheck + ", msgId='" + msgId + '\'' + ", transactionId='" + transactionId + '\'' + + ", queueId=" + queueId + + ", bname='" + bname + '\'' + '}'; } + + public int getQueueId() { + return queueId; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java index c64381fb787..f75494e51e7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader { +public class GetEarliestMsgStoretimeRequestHeader extends RpcRequestHeader { @CFNotNull private String topic; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java index 871309de6ce..dfa05e864a1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetMaxOffsetRequestHeader implements CommandCustomHeader { +public class GetMaxOffsetRequestHeader extends RpcRequestHeader { @CFNotNull private String topic; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java index 6fb8ed40c45..3a02634c809 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetMinOffsetRequestHeader implements CommandCustomHeader { +public class GetMinOffsetRequestHeader extends RpcRequestHeader { @CFNotNull private String topic; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java index 02fac8e59f7..440e5c607eb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java @@ -20,17 +20,15 @@ */ package org.apache.rocketmq.common.protocol.header; +import io.netty.buffer.ByteBuf; import java.util.HashMap; - -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.FastCodesHeader; -import io.netty.buffer.ByteBuf; - -public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader { +public class PullMessageRequestHeader extends RpcRequestHeader implements FastCodesHeader { @CFNotNull private String consumerGroup; @CFNotNull @@ -51,6 +49,7 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH private String subscription; @CFNotNull private Long subVersion; + @CFNullable private String expressionType; @Override @@ -70,6 +69,7 @@ public void encode(ByteBuf out) { writeIfNotNull(out, "subscription", subscription); writeIfNotNull(out, "subVersion", subVersion); writeIfNotNull(out, "expressionType", expressionType); + writeIfNotNull(out, "bname", bname); } @Override @@ -128,6 +128,11 @@ public void decode(HashMap fields) throws RemotingCommandExcepti if (str != null) { this.expressionType = str; } + + str = fields.get("bname"); + if (str != null) { + this.bname = str; + } } public String getConsumerGroup() { diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java index 3b7f627c35a..195f4640039 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader { +public class QueryConsumerOffsetRequestHeader extends RpcRequestHeader { @CFNotNull private String consumerGroup; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java index 3bb09073f72..aa747e9f424 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java @@ -17,12 +17,12 @@ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class ReplyMessageRequestHeader implements CommandCustomHeader { +public class ReplyMessageRequestHeader extends RpcRequestHeader { @CFNotNull private String producerGroup; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java index 5ea2e24bfc8..cd9f9e18b46 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class SearchOffsetRequestHeader implements CommandCustomHeader { +public class SearchOffsetRequestHeader extends RpcRequestHeader { @CFNotNull private String topic; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java index 2df31e6bb2a..8fa7737f500 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java @@ -20,12 +20,12 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class SendMessageRequestHeader implements CommandCustomHeader { +public class SendMessageRequestHeader extends RpcRequestHeader { @CFNotNull private String producerGroup; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java index ff9457e2838..45c49ef1aa3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java @@ -17,20 +17,18 @@ package org.apache.rocketmq.common.protocol.header; +import io.netty.buffer.ByteBuf; import java.util.HashMap; - -import org.apache.rocketmq.remoting.protocol.FastCodesHeader; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; - -import io.netty.buffer.ByteBuf; +import org.apache.rocketmq.remoting.protocol.FastCodesHeader; /** * Use short variable name to speed up FastJson deserialization process. */ -public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCodesHeader { +public class SendMessageRequestHeaderV2 extends RpcRequestHeader implements FastCodesHeader { @CFNotNull private String a; // producerGroup; @CFNotNull @@ -59,6 +57,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode @CFNullable private boolean m; //batch + private String n; // brokerName + public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) { SendMessageRequestHeader v1 = new SendMessageRequestHeader(); v1.setProducerGroup(v2.a); @@ -74,6 +74,7 @@ public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final Se v1.setUnitMode(v2.k); v1.setMaxReconsumeTimes(v2.l); v1.setBatch(v2.m); + v1.setBname(v2.n); return v1; } @@ -92,6 +93,7 @@ public static SendMessageRequestHeaderV2 createSendMessageRequestHeaderV2(final v2.k = v1.isUnitMode(); v2.l = v1.getMaxReconsumeTimes(); v2.m = v1.isBatch(); + v2.n = v1.getBname(); return v2; } @@ -114,6 +116,7 @@ public void encode(ByteBuf out) { writeIfNotNull(out, "k", k); writeIfNotNull(out, "l", l); writeIfNotNull(out, "m", m); + writeIfNotNull(out, "n", n); } @Override @@ -183,6 +186,11 @@ public void decode(HashMap fields) throws RemotingCommandExcepti if (str != null) { m = Boolean.parseBoolean(str); } + + str = fields.get("n"); + if (str != null) { + n = str; + } } public String getA() { diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java index 3f44db645c5..e6b0e950899 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader { +public class UpdateConsumerOffsetRequestHeader extends RpcRequestHeader { @CFNotNull private String consumerGroup; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java new file mode 100644 index 00000000000..9d1903b1cb4 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.rpc; + +import org.apache.rocketmq.remoting.CommandCustomHeader; + +public abstract class RpcRequestHeader implements CommandCustomHeader { + protected String bname; + + public String getBname() { + return bname; + } + + public void setBname(String bname) { + this.bname = bname; + } +} \ No newline at end of file diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 9e99925a84a..ddb23cc304b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -596,12 +596,10 @@ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consume long timestamp, boolean force) throws RemotingException, InterruptedException, MQBrokerException { long resetOffset; if (timestamp == -1) { - - resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis); + resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue, timeoutMillis); } else { resetOffset = - this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, - timeoutMillis); + this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue, timestamp, timeoutMillis); } RollbackStats rollbackStats = new RollbackStats(); @@ -619,6 +617,7 @@ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consume requestHeader.setTopic(queue.getTopic()); requestHeader.setQueueId(queue.getQueueId()); requestHeader.setCommitOffset(resetOffset); + requestHeader.setBname(queue.getBrokerName()); this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis); } return rollbackStats; @@ -1145,6 +1144,7 @@ public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQ requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); + requestHeader.setBname(mq.getBrokerName()); this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis); }