diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 4afe871afcd..2817f7d8fc6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -213,6 +213,7 @@ public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean is requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); + requestHeader.setBname(mq.getBrokerName()); if (isOneway) { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( @@ -239,6 +240,7 @@ private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingExcept requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setBname(mq.getBrokerName()); return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 8d3a2e91906..38527859460 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -193,8 +193,7 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti if (brokerAddr != null) { try { - return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp, - timeoutMillis); + return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } @@ -212,7 +211,7 @@ public long maxOffset(MessageQueue mq) throws MQClientException { if (brokerAddr != null) { try { - return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis); + return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } @@ -230,7 +229,7 @@ public long minOffset(MessageQueue mq) throws MQClientException { if (brokerAddr != null) { try { - return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis); + return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } @@ -248,8 +247,7 @@ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { if (brokerAddr != null) { try { - return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(), - timeoutMillis); + return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 1d33219536d..528f5646e2a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1136,6 +1136,7 @@ public MessageExt viewMessage(final String addr, final long phyoffset, final lon throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } + @Deprecated public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { @@ -1161,11 +1162,37 @@ public long searchOffset(final String addr, final String topic, final int queueI throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } - public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) + public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException { + SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); + requestHeader.setTopic(messageQueue.getTopic()); + requestHeader.setQueueId(messageQueue.getQueueId()); + requestHeader.setBname(messageQueue.getBrokerName()); + requestHeader.setTimestamp(timestamp); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + SearchOffsetResponseHeader responseHeader = + (SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); + return responseHeader.getOffset(); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); + } + + public long getMaxOffset(final String addr, final MessageQueue messageQueue, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); + requestHeader.setTopic(messageQueue.getTopic()); + requestHeader.setQueueId(messageQueue.getQueueId()); + requestHeader.setBname(messageQueue.getBrokerName()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -1212,11 +1239,12 @@ public List getConsumerIdListByGroup( throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } - public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) + public long getMinOffset(final String addr, final MessageQueue messageQueue, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); + requestHeader.setTopic(messageQueue.getTopic()); + requestHeader.setQueueId(messageQueue.getQueueId()); + requestHeader.setBname(messageQueue.getBrokerName()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -1236,12 +1264,12 @@ public long getMinOffset(final String addr, final String topic, final int queueI throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } - public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, - final long timeoutMillis) + public long getEarliestMsgStoretime(final String addr, final MessageQueue mq, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setBname(mq.getBrokerName()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 6ce8e261ca2..187b2573dc2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -227,13 +227,13 @@ public PullResult pullKernelImpl( requestHeader.setSubVersion(subVersion); requestHeader.setMaxMsgBytes(maxSizeInBytes); requestHeader.setExpressionType(expressionType); + requestHeader.setBname(mq.getBrokerName()); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr); } - PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 02445b31fca..1e5f507c2f0 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -712,7 +713,7 @@ public RemotingCommand answer(InvocationOnMock mock) { } }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); - long offset = mqClientAPI.getMaxOffset(brokerAddr, topic, 0, 10000); + long offset = mqClientAPI.getMaxOffset(brokerAddr, new MessageQueue(topic, brokerName, 0), 10000); assertThat(offset).isEqualTo(100L); } @@ -733,7 +734,7 @@ public RemotingCommand answer(InvocationOnMock mock) { } }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); - long offset = mqClientAPI.getMinOffset(brokerAddr, topic, 0, 10000); + long offset = mqClientAPI.getMinOffset(brokerAddr, new MessageQueue(topic, brokerName, 0), 10000); assertThat(offset).isEqualTo(100L); } @@ -754,7 +755,7 @@ public RemotingCommand answer(InvocationOnMock mock) { } }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); - long t = mqClientAPI.getEarliestMsgStoretime(brokerAddr, topic, 0, 10000); + long t = mqClientAPI.getEarliestMsgStoretime(brokerAddr, new MessageQueue(topic, brokerName, 0), 10000); assertThat(t).isEqualTo(100L); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 311beb7f89a..328c673db5f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -734,6 +734,7 @@ public void createStaticTopic(String addr, String defaultTopic, TopicConfig topi this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force); } + @Deprecated @Override public long searchOffset(final String brokerAddr, final String topicName, final int queueId, final long timestamp, final long timeoutMillis) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index eceaf040f81..5c7871987f5 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -868,10 +868,9 @@ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consume boolean force) throws RemotingException, InterruptedException, MQBrokerException { long resetOffset; if (timestamp == -1) { - - resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis); + resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue, timeoutMillis); } else { - resetOffset = this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, timeoutMillis); + resetOffset = this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue, timestamp, timeoutMillis); } RollbackStats rollbackStats = new RollbackStats(); @@ -889,6 +888,7 @@ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consume requestHeader.setTopic(queue.getTopic()); requestHeader.setQueueId(queue.getQueueId()); requestHeader.setCommitOffset(resetOffset); + requestHeader.setBname(queue.getBrokerName()); this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis); } return rollbackStats; @@ -1590,6 +1590,7 @@ public boolean consumedConcurrent(final MessageExt msg, requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); + requestHeader.setBname(mq.getBrokerName()); this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis); } @@ -1634,6 +1635,7 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String this.mqClientInstance.getMQClientAPIImpl().setMessageRequestMode(brokerAddr, topic, consumerGroup, mode, popShareQueueNum, timeoutMillis); } + @Deprecated @Override public long searchOffset(final String brokerAddr, final String topicName, final int queueId, final long timestamp, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 59ac6c5cb26..5f99cb69b54 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -371,6 +371,7 @@ void setMessageRequestMode(final String brokerAddr, final String topic, final St throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + @Deprecated long searchOffset(final String brokerAddr, final String topicName, final int queueId, final long timestamp, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException; diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 17e04e7f583..e8bc91a02c7 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -16,6 +16,19 @@ */ package org.apache.rocketmq.tools.admin; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -70,23 +83,9 @@ import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; -import java.io.UnsupportedEncodingException; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; - import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; @@ -467,14 +466,14 @@ public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerE @Test public void testMaxOffset() throws Exception { - when(mQClientAPIImpl.getMaxOffset(anyString(), anyString(), anyInt(), anyLong())).thenReturn(100L); + when(mQClientAPIImpl.getMaxOffset(anyString(), any(MessageQueue.class), anyLong())).thenReturn(100L); assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, broker1Name, 0))).isEqualTo(100L); } @Test public void testSearchOffset() throws Exception { - when(mQClientAPIImpl.searchOffset(anyString(), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(101L); + when(mQClientAPIImpl.searchOffset(anyString(), any(MessageQueue.class), anyLong(), anyLong())).thenReturn(101L); assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, broker1Name, 0), System.currentTimeMillis())).isEqualTo(101L); }