Skip to content

Commit

Permalink
[ISSUE #3905] Support bname in protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma authored and zhouxinyu committed Jul 18, 2022
1 parent 3ee603a commit ac6ccc5
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean is
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
requestHeader.setBname(mq.getBrokerName());

if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
Expand All @@ -239,6 +240,7 @@ private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingExcept
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setBname(mq.getBrokerName());

return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
timeoutMillis);
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand All @@ -212,7 +211,7 @@ public long maxOffset(MessageQueue mq) throws MQClientException {

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand All @@ -230,7 +229,7 @@ public long minOffset(MessageQueue mq) throws MQClientException {

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand All @@ -248,8 +247,7 @@ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(),
timeoutMillis);
return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,7 @@ public MessageExt viewMessage(final String addr, final long phyoffset, final lon
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

@Deprecated
public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
Expand All @@ -1161,11 +1162,37 @@ public long searchOffset(final String addr, final String topic, final int queueI
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
requestHeader.setTopic(messageQueue.getTopic());
requestHeader.setQueueId(messageQueue.getQueueId());
requestHeader.setBname(messageQueue.getBrokerName());
requestHeader.setTimestamp(timestamp);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
return responseHeader.getOffset();
}
default:
break;
}

throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

public long getMaxOffset(final String addr, final MessageQueue messageQueue, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setTopic(messageQueue.getTopic());
requestHeader.setQueueId(messageQueue.getQueueId());
requestHeader.setBname(messageQueue.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
Expand Down Expand Up @@ -1212,11 +1239,12 @@ public List<String> getConsumerIdListByGroup(
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
public long getMinOffset(final String addr, final MessageQueue messageQueue, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setTopic(messageQueue.getTopic());
requestHeader.setQueueId(messageQueue.getQueueId());
requestHeader.setBname(messageQueue.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
Expand All @@ -1236,12 +1264,12 @@ public long getMinOffset(final String addr, final String topic, final int queueI
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId,
final long timeoutMillis)
public long getEarliestMsgStoretime(final String addr, final MessageQueue mq, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setBname(mq.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ public PullResult pullKernelImpl(
requestHeader.setSubVersion(subVersion);
requestHeader.setMaxMsgBytes(maxSizeInBytes);
requestHeader.setExpressionType(expressionType);
requestHeader.setBname(mq.getBrokerName());

String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}


PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
Expand Down Expand Up @@ -712,7 +713,7 @@ public RemotingCommand answer(InvocationOnMock mock) {
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());

long offset = mqClientAPI.getMaxOffset(brokerAddr, topic, 0, 10000);
long offset = mqClientAPI.getMaxOffset(brokerAddr, new MessageQueue(topic, brokerName, 0), 10000);
assertThat(offset).isEqualTo(100L);
}

Expand All @@ -733,7 +734,7 @@ public RemotingCommand answer(InvocationOnMock mock) {
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());

long offset = mqClientAPI.getMinOffset(brokerAddr, topic, 0, 10000);
long offset = mqClientAPI.getMinOffset(brokerAddr, new MessageQueue(topic, brokerName, 0), 10000);
assertThat(offset).isEqualTo(100L);
}

Expand All @@ -754,7 +755,7 @@ public RemotingCommand answer(InvocationOnMock mock) {
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());

long t = mqClientAPI.getEarliestMsgStoretime(brokerAddr, topic, 0, 10000);
long t = mqClientAPI.getEarliestMsgStoretime(brokerAddr, new MessageQueue(topic, brokerName, 0), 10000);
assertThat(t).isEqualTo(100L);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ public void createStaticTopic(String addr, String defaultTopic, TopicConfig topi
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}

@Deprecated
@Override
public long searchOffset(final String brokerAddr, final String topicName,
final int queueId, final long timestamp, final long timeoutMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -868,10 +868,9 @@ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consume
boolean force) throws RemotingException, InterruptedException, MQBrokerException {
long resetOffset;
if (timestamp == -1) {

resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis);
resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue, timeoutMillis);
} else {
resetOffset = this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, timeoutMillis);
resetOffset = this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue, timestamp, timeoutMillis);
}

RollbackStats rollbackStats = new RollbackStats();
Expand All @@ -889,6 +888,7 @@ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consume
requestHeader.setTopic(queue.getTopic());
requestHeader.setQueueId(queue.getQueueId());
requestHeader.setCommitOffset(resetOffset);
requestHeader.setBname(queue.getBrokerName());
this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
}
return rollbackStats;
Expand Down Expand Up @@ -1590,6 +1590,7 @@ public boolean consumedConcurrent(final MessageExt msg,
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
requestHeader.setBname(mq.getBrokerName());
this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
}

Expand Down Expand Up @@ -1634,6 +1635,7 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String
this.mqClientInstance.getMQClientAPIImpl().setMessageRequestMode(brokerAddr, topic, consumerGroup, mode, popShareQueueNum, timeoutMillis);
}

@Deprecated
@Override
public long searchOffset(final String brokerAddr, final String topicName, final int queueId, final long timestamp,
final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ void setMessageRequestMode(final String brokerAddr, final String topic, final St
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException;

@Deprecated
long searchOffset(final String brokerAddr, final String topicName,
final int queueId, final long timestamp, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@
*/
package org.apache.rocketmq.tools.admin;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
Expand Down Expand Up @@ -70,23 +83,9 @@
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -467,14 +466,14 @@ public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerE

@Test
public void testMaxOffset() throws Exception {
when(mQClientAPIImpl.getMaxOffset(anyString(), anyString(), anyInt(), anyLong())).thenReturn(100L);
when(mQClientAPIImpl.getMaxOffset(anyString(), any(MessageQueue.class), anyLong())).thenReturn(100L);

assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, broker1Name, 0))).isEqualTo(100L);
}

@Test
public void testSearchOffset() throws Exception {
when(mQClientAPIImpl.searchOffset(anyString(), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(101L);
when(mQClientAPIImpl.searchOffset(anyString(), any(MessageQueue.class), anyLong(), anyLong())).thenReturn(101L);

assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, broker1Name, 0), System.currentTimeMillis())).isEqualTo(101L);
}
Expand Down

0 comments on commit ac6ccc5

Please sign in to comment.