Skip to content

Commit

Permalink
[ISSUE #8365] add non-oneway updateConsumerOffset (#8368)
Browse files Browse the repository at this point in the history
  • Loading branch information
qianye1001 authored Jul 12, 2024
1 parent 67ddc1d commit 6c3781f
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,33 @@ public CompletableFuture<Void> updateConsumerOffsetOneWay(
return future;
}

public CompletableFuture<Void> updateConsumerOffsetAsync(
String brokerAddr,
UpdateConsumerOffsetRequestHeader header,
long timeoutMillis
) {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, header);
CompletableFuture<Void> future = new CompletableFuture<>();
invoke(brokerAddr, request, timeoutMillis).whenComplete((response, t) -> {
if (t != null) {
log.error("updateConsumerOffsetAsync failed, brokerAddr={}, requestHeader={}", brokerAddr, header, t);
future.completeExceptionally(t);
return;
}
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
future.complete(null);
}
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST:
case ResponseCode.TOPIC_NOT_EXIST: {
future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark()));
}
}
});
return future;
}

public CompletableFuture<List<String>> getConsumerListByGroupAsync(
String brokerAddr,
GetConsumerListByGroupRequestHeader requestHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
package org.apache.rocketmq.client.impl.mqclient;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -34,9 +39,12 @@
import org.mockito.junit.MockitoJUnitRunner;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;

@RunWith(MockitoJUnitRunner.class)
public class MQClientAPIExtTest {
Expand Down Expand Up @@ -71,4 +79,33 @@ public void sendMessageAsync() {
CompletableFuture<SendResult> future = mqClientAPIExt.sendMessageAsync("127.0.0.1:10911", "test", msg, requestHeader, 10);
assertThatThrownBy(future::get).getCause().isInstanceOf(RemotingTimeoutException.class);
}

@Test
public void testUpdateConsumerOffsetAsync_Success() throws ExecutionException, InterruptedException {
CompletableFuture<RemotingCommand> remotingFuture = new CompletableFuture<>();
remotingFuture.complete(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, ""));
doReturn(remotingFuture).when(remotingClientMock).invoke(anyString(), any(RemotingCommand.class), anyLong());

CompletableFuture<Void> future = mqClientAPIExt.updateConsumerOffsetAsync("brokerAddr", new UpdateConsumerOffsetRequestHeader(), 3000L);

assertNull("Future should be completed without exception", future.get());
}

@Test
public void testUpdateConsumerOffsetAsync_Fail() throws InterruptedException {

CompletableFuture<RemotingCommand> remotingFuture = new CompletableFuture<>();
remotingFuture.complete(RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, "QueueId is null, topic is testTopic"));
doReturn(remotingFuture).when(remotingClientMock).invoke(anyString(), any(RemotingCommand.class), anyLong());

CompletableFuture<Void> future = mqClientAPIExt.updateConsumerOffsetAsync("brokerAddr", new UpdateConsumerOffsetRequestHeader(), 3000L);

try {
future.get();
} catch (ExecutionException e) {
MQBrokerException customEx = (MQBrokerException) e.getCause();
assertEquals(customEx.getResponseCode(), ResponseCode.SYSTEM_ERROR);
assertEquals(customEx.getErrorMessage(), "QueueId is null, topic is testTopic");
}
}
}

0 comments on commit 6c3781f

Please sign in to comment.