Skip to content

Commit

Permalink
feat: support reset offset for lite pull consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
humkum committed Dec 18, 2024
1 parent 93e2689 commit 12491f3
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
Expand Down Expand Up @@ -1248,12 +1249,25 @@ public String findBrokerAddrByTopic(final String topic) {

public synchronized void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
DefaultLitePullConsumerImpl litePullConsumer;
try {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl instanceof DefaultMQPushConsumerImpl) {
consumer = (DefaultMQPushConsumerImpl) impl;
} else if (impl instanceof DefaultLitePullConsumerImpl) {
litePullConsumer = (DefaultLitePullConsumerImpl) impl;
litePullConsumer.pause(offsetTable.keySet());
for (Entry<MessageQueue, Long> messageQueueLongEntry : offsetTable.entrySet()) {
try {
litePullConsumer.seek(messageQueueLongEntry.getKey(), messageQueueLongEntry.getValue());
} catch (MQClientException e) {
log.warn("[reset-offset] reset offset failed, topic={}, group={}, mq={}", topic, group, messageQueueLongEntry.getValue());
}
}
litePullConsumer.resume(offsetTable.keySet());
return;
} else {
log.info("[reset-offset] consumer does not exist. group={}", group);
log.info("[reset-offset] consumer does not support reset offset. group={}", group);
return;
}
consumer.suspend();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
Expand Down Expand Up @@ -102,6 +104,12 @@ public class MQClientInstanceTest {
@Mock
private ClientConfig clientConfig;

@Mock
private DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

@Mock
private RebalancePushImpl rebalancePushImpl;

private final MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());

private final String topic = "FooBar";
Expand Down Expand Up @@ -501,4 +509,42 @@ private List<BrokerData> createBrokerDatas() {
brokerData.setBrokerAddrs(brokerAddrs);
return Collections.singletonList(brokerData);
}

@Test
public void testResetOffsetForDefaultMQPushConsumer() {
Map<MessageQueue, Long> offsetTable = new HashMap<>();
MessageQueue messageQueue = new MessageQueue(topic, defaultBroker, 0);
offsetTable.put(messageQueue, 100L);
ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<>();
processQueueTable.put(messageQueue, new ProcessQueue());

when(defaultMQPushConsumerImpl.getRebalanceImpl()).thenReturn(rebalancePushImpl);
when(rebalancePushImpl.getProcessQueueTable()).thenReturn(processQueueTable);

mqClientInstance.registerConsumer(group, defaultMQPushConsumerImpl);
mqClientInstance.resetOffset(topic, group, offsetTable);

verify(defaultMQPushConsumerImpl).suspend();
verify(defaultMQPushConsumerImpl).resume();
verify(rebalancePushImpl).removeUnnecessaryMessageQueue(any(MessageQueue.class), any(ProcessQueue.class));
verify(defaultMQPushConsumerImpl, times(1)).updateConsumeOffset(any(MessageQueue.class), anyLong());
}

@Test
public void testResetOffsetForDefaultLitePullConsumer() throws MQClientException {
Map<MessageQueue, Long> offsetTable = new HashMap<>();
MessageQueue messageQueue = new MessageQueue(topic, defaultBroker, 0);
offsetTable.put(messageQueue, 100L);

DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = mock(DefaultLitePullConsumerImpl.class);
when(defaultLitePullConsumerImpl.isRunning()).thenReturn(true);
when(defaultLitePullConsumerImpl.assignment()).thenReturn(new HashSet<>(Collections.singletonList(messageQueue)));

mqClientInstance.registerConsumer(group, defaultLitePullConsumerImpl);
mqClientInstance.resetOffset(topic, group, offsetTable);

verify(defaultLitePullConsumerImpl).pause(any(Set.class));
verify(defaultLitePullConsumerImpl).seek(any(MessageQueue.class), anyLong());
verify(defaultLitePullConsumerImpl).resume(any(Set.class));
}
}

0 comments on commit 12491f3

Please sign in to comment.