From 7736c5dddabf3845065a58de224ccb240b0f5386 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Fri, 8 Dec 2023 10:58:34 +0800 Subject: [PATCH] refactor(proxy): proxy message to the node assigned to its queue (#812) Signed-off-by: SSpirits --- .../rocketmq/broker/BrokerController.java | 2 +- .../proxy/service/DeadLetterService.java | 14 +++--- .../proxy/service/MessageServiceImpl.java | 44 ++++++++++++++----- .../proxy/service/DLQServiceTest.java | 12 ++--- .../proxy/service/MessageServiceImplTest.java | 7 +-- .../rocketmq/store/api/DeadLetterSender.java | 6 +-- .../rocketmq/store/service/ReviveService.java | 4 +- .../rocketmq/store/MessageStoreTest.java | 2 +- .../store/service/ReviveServiceTest.java | 20 ++++----- 9 files changed, 66 insertions(+), 45 deletions(-) diff --git a/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java b/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java index ea827ed27..d4b233f3a 100644 --- a/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java @@ -124,7 +124,7 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception { LockService lockService = new LockService(brokerConfig.proxy()); ProducerManager producerManager = new ProducerManager(); - MessageServiceImpl messageServiceImpl = new MessageServiceImpl(brokerConfig.proxy(), messageStore, proxyMetadataService, lockService, dlqService, producerManager); + MessageServiceImpl messageServiceImpl = new MessageServiceImpl(brokerConfig, messageStore, proxyMetadataService, lockService, dlqService, producerManager, relayClient); this.messageService = messageServiceImpl; this.extendMessageService = messageServiceImpl; ConsumerManager consumerManager = new ConsumerManager(new DefaultServiceManager.ConsumerIdsChangeListenerImpl(), brokerConfig.proxy().channelExpiredTimeout()); diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/DeadLetterService.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/DeadLetterService.java index 20b54927c..c7360e5a5 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/DeadLetterService.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/DeadLetterService.java @@ -22,7 +22,6 @@ import apache.rocketmq.controller.v1.MessageType; import apache.rocketmq.controller.v1.Topic; import com.automq.rocketmq.common.config.BrokerConfig; -import com.automq.rocketmq.common.model.FlatMessageExt; import com.automq.rocketmq.common.model.generated.FlatMessage; import com.automq.rocketmq.common.system.MessageConstants; import com.automq.rocketmq.common.trace.TraceContext; @@ -66,7 +65,7 @@ public void init(MessageStore messageStore) { @Override @WithSpan - public CompletableFuture send(TraceContext context, long consumerGroupId, FlatMessageExt flatMessageExt) { + public CompletableFuture send(TraceContext context, long consumerGroupId, FlatMessage message) { if (messageStore == null) { return CompletableFuture.failedFuture(new IllegalStateException("Message store is not initialized")); } @@ -74,16 +73,16 @@ public CompletableFuture send(TraceContext context, long consumerGroupId, CompletableFuture dlqQueryCf = metadataService.consumerGroupOf(consumerGroupId) .thenComposeAsync(consumerGroup -> { long deadLetterTopicId = consumerGroup.getDeadLetterTopicId(); - long topicId = flatMessageExt.message().topicId(); + long topicId = message.topicId(); if (deadLetterTopicId == MessageConstants.UNINITIALIZED_TOPIC_ID) { // not allow to send to DLQ LOGGER.warn("Message: {} is dropped because the consumer group: {} doesn't have DLQ topic", - flatMessageExt, consumerGroupId); + message.systemProperties().messageId(), consumerGroupId); return CompletableFuture.completedFuture(null); } if (deadLetterTopicId == topicId) { LOGGER.error("Message: {} is dropped because the consumer group: {} has the same DLQ topic: {} with original topic", - flatMessageExt, consumerGroupId, topicId); + message.systemProperties().messageId(), consumerGroupId, topicId); return CompletableFuture.completedFuture(null); } // get dlq topic info @@ -98,17 +97,16 @@ public CompletableFuture send(TraceContext context, long consumerGroupId, if (!(dlqTopic.getAcceptTypes().getTypesList().contains(MessageType.NORMAL) || dlqTopic.getAcceptTypes().getTypesList().contains(MessageType.FIFO))) { LOGGER.error("Message: {} is dropped because the consumer group: {} has invalid DLQ topic: {}", - flatMessageExt, consumerGroupId, dlqTopic); + message.systemProperties().messageId(), consumerGroupId, dlqTopic); return CompletableFuture.completedFuture(null); } - FlatMessage message = flatMessageExt.message(); message.mutateTopicId(dlqTopic.getTopicId()); List assignmentsList = new ArrayList<>(dlqTopic.getAssignmentsList()); if (assignmentsList.isEmpty()) { LOGGER.error("Message: {} is dropped because the consumer group: {} has empty DLQ topic: {}", - flatMessageExt, consumerGroupId, dlqTopic); + message.systemProperties().messageId(), consumerGroupId, dlqTopic); return CompletableFuture.completedFuture(null); } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java index 8ab683bb5..f5407e685 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java @@ -19,11 +19,13 @@ import apache.rocketmq.common.v1.Code; import apache.rocketmq.controller.v1.ConsumerGroup; +import apache.rocketmq.controller.v1.MessageQueueAssignment; import apache.rocketmq.controller.v1.StreamRole; import apache.rocketmq.controller.v1.SubscriptionMode; import apache.rocketmq.controller.v1.Topic; import apache.rocketmq.proxy.v1.QueueStats; import apache.rocketmq.proxy.v1.StreamStats; +import com.automq.rocketmq.common.config.BrokerConfig; import com.automq.rocketmq.common.config.ProxyConfig; import com.automq.rocketmq.common.exception.ControllerException; import com.automq.rocketmq.common.model.FlatMessageExt; @@ -31,6 +33,7 @@ import com.automq.rocketmq.common.util.CommonUtil; import com.automq.rocketmq.metadata.api.ProxyMetadataService; import com.automq.rocketmq.proxy.exception.ProxyException; +import com.automq.rocketmq.proxy.grpc.ProxyClient; import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager; import com.automq.rocketmq.proxy.model.ProxyContextExt; import com.automq.rocketmq.proxy.model.VirtualQueue; @@ -118,6 +121,7 @@ public class MessageServiceImpl implements MessageService, ExtendMessageService { private static final Logger LOGGER = LoggerFactory.getLogger(MessageServiceImpl.class); + private final BrokerConfig brokerConfig; private final ProxyConfig config; private final ProxyMetadataService metadataService; private final MessageStore store; @@ -125,19 +129,22 @@ public class MessageServiceImpl implements MessageService, ExtendMessageService private final DeadLetterSender deadLetterService; private final SuspendRequestService suspendRequestService; private final ProducerManager producerManager; + private final ProxyClient relayClient; private final ExecutorService executorService = ThreadPoolMonitor.createAndMonitor(2, 5, 100, TimeUnit.SECONDS, "Transaction-msg-check-thread", 2000); - public MessageServiceImpl(ProxyConfig config, MessageStore store, ProxyMetadataService metadataService, - LockService lockService, DeadLetterSender deadLetterService, - ProducerManager producerManager) throws StoreException { - this.config = config; + public MessageServiceImpl(BrokerConfig config, MessageStore store, ProxyMetadataService metadataService, + LockService lockService, DeadLetterSender deadLetterService, ProducerManager producerManager, + ProxyClient relayClient) throws StoreException { + this.brokerConfig = config; + this.config = config.proxy(); this.store = store; this.metadataService = metadataService; this.deadLetterService = deadLetterService; this.lockService = lockService; this.suspendRequestService = SuspendRequestService.getInstance(); this.producerManager = producerManager; + this.relayClient = relayClient; store.registerTransactionCheckHandler(timerTag -> executorService.execute(() -> { try { checkTransactionStatus(timerTag); @@ -187,20 +194,24 @@ public CompletableFuture> sendMessage(ProxyContext ctx, ProxyContextExt contextExt = (ProxyContextExt) ctx; FlatMessage flatMessage = FlatMessageUtil.convertTo(contextExt, topic.getTopicId(), virtualQueue.physicalQueueId(), config.hostName(), message); + Optional optional = topic.getAssignmentsList().stream().filter(assignment -> assignment.getQueue().getQueueId() == flatMessage.queueId()).findFirst(); + if (optional.isEmpty()) { + LOGGER.error("Message: {} is dropped because the topic: {} doesn't have queue: {}", + messageId, topic.getName(), flatMessage.queueId()); + return CompletableFuture.failedFuture(new ProxyException(apache.rocketmq.v2.Code.BAD_REQUEST, "Queue " + flatMessage.queueId() + " is not assigned to any node.")); + } + MessageQueueAssignment assignment = optional.get(); + if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { flatMessage.systemProperties().mutateDeliveryAttempts(requestHeader.getReconsumeTimes() + 1); if (requestHeader.getReconsumeTimes() > requestHeader.getMaxReconsumeTimes()) { String groupName = requestHeader.getTopic().replace(MixAll.RETRY_GROUP_TOPIC_PREFIX, ""); - FlatMessageExt flatMessageExt = FlatMessageExt.Builder.builder() - .message(flatMessage) - .offset(0) - .build(); contextExt.span().ifPresent(span -> { span.setAttribute("deadLetter", true); span.setAttribute("group", groupName); }); return consumerGroupOf(groupName) - .thenCompose(group -> deadLetterService.send(contextExt, group.getGroupId(), flatMessageExt)) + .thenCompose(group -> deadLetterService.send(contextExt, group.getGroupId(), flatMessage)) .thenApply(ignore -> new PutResult(PutResult.Status.PUT_OK, 0)); } else { String groupName = requestHeader.getTopic().replace(MixAll.RETRY_GROUP_TOPIC_PREFIX, ""); @@ -210,10 +221,20 @@ public CompletableFuture> sendMessage(ProxyContext ctx, span.setAttribute("reconsumeTimes", requestHeader.getReconsumeTimes()); span.setAttribute("deliveryTimestamp", flatMessage.systemProperties().deliveryTimestamp()); }); + if (assignment.getNodeId() != brokerConfig.nodeId()) { + return metadataService.addressOf(assignment.getNodeId()) + .thenCompose(address -> relayClient.relayMessage(address, flatMessage)) + .thenApply(status -> new PutResult(PutResult.Status.PUT_OK, 0)); + } return store.put(ContextUtil.buildStoreContext(ctx, topic.getName(), groupName), flatMessage); } } + if (assignment.getNodeId() != brokerConfig.nodeId()) { + return metadataService.addressOf(assignment.getNodeId()) + .thenCompose(address -> relayClient.relayMessage(address, flatMessage)) + .thenApply(status -> new PutResult(PutResult.Status.PUT_OK, 0)); + } return store.put(ContextUtil.buildStoreContext(ctx, topic.getName(), ""), flatMessage); }); @@ -274,7 +295,7 @@ public CompletableFuture sendMessageBack(ProxyContext ctx, Rece throw new ProxyException(apache.rocketmq.v2.Code.MESSAGE_NOT_FOUND, "Message not found from server."); }).thenCompose(messageExt -> { if (messageExt.deliveryAttempts() > group.getMaxDeliveryAttempt()) { - return deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt); + return deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt.message()); } // Message consume retry strategy @@ -282,7 +303,8 @@ public CompletableFuture sendMessageBack(ProxyContext ctx, Rece // =0: broker control retry frequency // >0: client control retry frequency return switch (Integer.compare(delayLevel, 0)) { - case -1 -> deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt); + case -1 -> + deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt.message()); case 0 -> topicOf(MixAll.RETRY_GROUP_TOPIC_PREFIX + requestHeader.getGroup()) .thenCompose(retryTopic -> { // Keep the same logic as apache RocketMQ. diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/service/DLQServiceTest.java b/proxy/src/test/java/com/automq/rocketmq/proxy/service/DLQServiceTest.java index 87c21f359..d3eb4b3e4 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/service/DLQServiceTest.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/service/DLQServiceTest.java @@ -108,7 +108,7 @@ public void send_normal() { return CompletableFuture.completedFuture(new PutResult(PutResult.Status.PUT_OK, 0)); }).when(messageStore).put(Mockito.any(), Mockito.any(FlatMessage.class)); - dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join(); + dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join(); Mockito.verify(messageStore, Mockito.times(1)).put(Mockito.any(), Mockito.any(FlatMessage.class)); } @@ -132,7 +132,7 @@ public void send_group_not_allowed_dlq() { FlatMessageExt msg = MockMessageUtil.buildMessage(TOPIC_ID, QUEUE_ID, "TAG_DLQ"); - dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join(); + dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join(); Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class)); // 2. DLQ topic is configured but has no assignment @@ -147,7 +147,7 @@ public void send_group_not_allowed_dlq() { Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup)) .when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID); - dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join(); + dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join(); Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class)); // 3. DLQ topic is the same as original topic @@ -163,7 +163,7 @@ public void send_group_not_allowed_dlq() { Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup)) .when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID); - dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join(); + dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join(); Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class)); // 4. DLQ topic doesn't accept DLQ message @@ -187,7 +187,7 @@ public void send_group_not_allowed_dlq() { Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup)) .when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID); - dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join(); + dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join(); Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class)); // 5. DLQ topic not exist @@ -203,7 +203,7 @@ public void send_group_not_allowed_dlq() { Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup)) .when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID); - dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join(); + dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join(); Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class)); } } diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java b/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java index db7d11f27..d5bcad8fb 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java @@ -18,9 +18,10 @@ package com.automq.rocketmq.proxy.service; import apache.rocketmq.v2.Code; -import com.automq.rocketmq.common.config.ProxyConfig; +import com.automq.rocketmq.common.config.BrokerConfig; import com.automq.rocketmq.metadata.api.ProxyMetadataService; import com.automq.rocketmq.proxy.exception.ProxyException; +import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient; import com.automq.rocketmq.proxy.mock.MockMessageStore; import com.automq.rocketmq.proxy.mock.MockProxyMetadataService; import com.automq.rocketmq.proxy.model.ProxyContextExt; @@ -100,10 +101,10 @@ public static void setUpAll() throws Exception { public void setUp() throws StoreException { metadataService = new MockProxyMetadataService(); messageStore = new MockMessageStore(); - ProxyConfig config = new ProxyConfig(); + BrokerConfig config = new BrokerConfig(); deadLetterSender = Mockito.mock(DeadLetterSender.class); Mockito.doReturn(CompletableFuture.completedFuture(null)).when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any()); - messageService = new MessageServiceImpl(config, messageStore, metadataService, new LockService(config), deadLetterSender, new ProducerManager()); + messageService = new MessageServiceImpl(config, messageStore, metadataService, new LockService(config.proxy()), deadLetterSender, new ProducerManager(), new GrpcProxyClient(config)); } @Test diff --git a/store/src/main/java/com/automq/rocketmq/store/api/DeadLetterSender.java b/store/src/main/java/com/automq/rocketmq/store/api/DeadLetterSender.java index 9ebc49025..6ce7e9ea3 100644 --- a/store/src/main/java/com/automq/rocketmq/store/api/DeadLetterSender.java +++ b/store/src/main/java/com/automq/rocketmq/store/api/DeadLetterSender.java @@ -17,7 +17,7 @@ package com.automq.rocketmq.store.api; -import com.automq.rocketmq.common.model.FlatMessageExt; +import com.automq.rocketmq.common.model.generated.FlatMessage; import com.automq.rocketmq.common.trace.TraceContext; import java.util.concurrent.CompletableFuture; @@ -27,7 +27,7 @@ public interface DeadLetterSender { * * @param context trace context * @param consumerGroupId consumer group id - * @param originalFlatMessage original message + * @param message original message */ - CompletableFuture send(TraceContext context, long consumerGroupId, FlatMessageExt originalFlatMessage); + CompletableFuture send(TraceContext context, long consumerGroupId, FlatMessage message); } diff --git a/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java b/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java index bb0274291..da8f78730 100644 --- a/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java +++ b/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java @@ -193,7 +193,7 @@ public void run() { if (consumeTimes >= maxDeliveryAttempts) { messageExt.setDeliveryAttempts(consumeTimes); // Send to dead letter topic specified in consumer group config. - return deadLetterSender.send(context, consumerGroupId, messageExt) + return deadLetterSender.send(context, consumerGroupId, messageExt.message()) .thenApply(nil -> Pair.of(true, logicQueue)); } return CompletableFuture.completedFuture(Pair.of(false, logicQueue)); @@ -203,7 +203,7 @@ public void run() { if (messageExt.deliveryAttempts() >= maxDeliveryAttempts) { // Send to dead letter topic specified in consumer group config. - return deadLetterSender.send(context, consumerGroupId, messageExt) + return deadLetterSender.send(context, consumerGroupId, messageExt.message()) .thenApply(nil -> Pair.of(true, logicQueue)); } messageExt.setOriginalQueueOffset(messageExt.originalOffset()); diff --git a/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java b/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java index ac9c540f2..74896cb39 100644 --- a/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java @@ -104,7 +104,7 @@ public void setUp() throws Exception { logicQueueManager = new DefaultLogicQueueManager(config, streamStore, kvService, timerService, metadataService, operationLogService, inflightService, streamReclaimService); DeadLetterSender deadLetterSender = Mockito.mock(DeadLetterSender.class); Mockito.doReturn(CompletableFuture.completedFuture(null)) - .when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessageExt.class)); + .when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessage.class)); MessageArrivalNotificationService messageArrivalNotificationService = new MessageArrivalNotificationService(); reviveService = new ReviveService(KV_NAMESPACE_CHECK_POINT, kvService, timerService, metadataService, messageArrivalNotificationService, logicQueueManager, deadLetterSender); diff --git a/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java b/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java index e013b57cd..38ea06fe9 100644 --- a/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java @@ -104,7 +104,7 @@ void revive_normal() throws StoreException { FlatMessageExt flatMessageExt = ink.getArgument(2); assertNotNull(flatMessageExt); return CompletableFuture.completedFuture(null); - }).when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessageExt.class)); + }).when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessage.class)); // mock max delivery attempts Mockito.doReturn(CompletableFuture.completedFuture(2)) .when(metadataService).maxDeliveryAttemptsOf(Mockito.anyLong()); @@ -152,7 +152,7 @@ void revive_normal() throws StoreException { // check if this message has been sent to DLQ Mockito.verify(deadLetterSender, Mockito.times(1)) - .send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessageExt.class)); + .send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessage.class)); PopResult popResult1 = logicQueue.popRetry(StoreContext.EMPTY, CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 1, invisibleDuration).join(); assertEquals(0, popResult1.messageList().size()); } @@ -162,10 +162,10 @@ void revive_fifo() throws StoreException { Mockito.doAnswer(ink -> { long consumerGroupId = ink.getArgument(1); assertEquals(CONSUMER_GROUP_ID, consumerGroupId); - FlatMessageExt flatMessageExt = ink.getArgument(2); - assertNotNull(flatMessageExt); + FlatMessage flatMessage = ink.getArgument(2); + assertNotNull(flatMessage); return CompletableFuture.completedFuture(null); - }).when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessageExt.class)); + }).when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessage.class)); // mock max delivery attempts Mockito.doReturn(CompletableFuture.completedFuture(2)) .when(metadataService).maxDeliveryAttemptsOf(Mockito.anyLong()); @@ -212,7 +212,7 @@ void revive_fifo() throws StoreException { // check if this message has been sent to DLQ Mockito.verify(deadLetterSender, Mockito.times(1)) - .send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessageExt.class)); + .send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessage.class)); assertEquals(1, logicQueue.getAckOffset(CONSUMER_GROUP_ID)); @@ -229,10 +229,10 @@ void revive_dead_letter() throws Exception { Mockito.doAnswer(ink -> { long consumerGroupId = ink.getArgument(1); assertEquals(CONSUMER_GROUP_ID, consumerGroupId); - FlatMessageExt flatMessageExt = ink.getArgument(2); - assertNotNull(flatMessageExt); + FlatMessage flatMessage = ink.getArgument(2); + assertNotNull(flatMessage); return CompletableFuture.completedFuture(null); - }).when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessageExt.class)); + }).when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessage.class)); // mock max delivery attempts Mockito.doReturn(CompletableFuture.completedFuture(2)) .when(metadataService).maxDeliveryAttemptsOf(Mockito.anyLong()); @@ -279,7 +279,7 @@ void revive_dead_letter() throws Exception { timerService.dequeue(); // check if this message has been sent to DLQ - Mockito.verify(deadLetterSender, Mockito.times(1)).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessageExt.class)); + Mockito.verify(deadLetterSender, Mockito.times(1)).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessage.class)); // check ck not exist assertTrue(retryPopResult.messageList().get(0).receiptHandle().isPresent()); handle = SerializeUtil.decodeReceiptHandle(retryPopResult.messageList().get(0).receiptHandle().get());