Skip to content

Commit

Permalink
refactor(proxy): proxy message to the node assigned to its queue (#812)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <admin@lv5.moe>
  • Loading branch information
ShadowySpirits authored Dec 8, 2023
1 parent 853f566 commit 7736c5d
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
LockService lockService = new LockService(brokerConfig.proxy());

ProducerManager producerManager = new ProducerManager();
MessageServiceImpl messageServiceImpl = new MessageServiceImpl(brokerConfig.proxy(), messageStore, proxyMetadataService, lockService, dlqService, producerManager);
MessageServiceImpl messageServiceImpl = new MessageServiceImpl(brokerConfig, messageStore, proxyMetadataService, lockService, dlqService, producerManager, relayClient);
this.messageService = messageServiceImpl;
this.extendMessageService = messageServiceImpl;
ConsumerManager consumerManager = new ConsumerManager(new DefaultServiceManager.ConsumerIdsChangeListenerImpl(), brokerConfig.proxy().channelExpiredTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.Topic;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.common.system.MessageConstants;
import com.automq.rocketmq.common.trace.TraceContext;
Expand Down Expand Up @@ -66,24 +65,24 @@ public void init(MessageStore messageStore) {

@Override
@WithSpan
public CompletableFuture<Void> send(TraceContext context, long consumerGroupId, FlatMessageExt flatMessageExt) {
public CompletableFuture<Void> send(TraceContext context, long consumerGroupId, FlatMessage message) {
if (messageStore == null) {
return CompletableFuture.failedFuture(new IllegalStateException("Message store is not initialized"));
}

CompletableFuture<Topic> dlqQueryCf = metadataService.consumerGroupOf(consumerGroupId)
.thenComposeAsync(consumerGroup -> {
long deadLetterTopicId = consumerGroup.getDeadLetterTopicId();
long topicId = flatMessageExt.message().topicId();
long topicId = message.topicId();
if (deadLetterTopicId == MessageConstants.UNINITIALIZED_TOPIC_ID) {
// not allow to send to DLQ
LOGGER.warn("Message: {} is dropped because the consumer group: {} doesn't have DLQ topic",
flatMessageExt, consumerGroupId);
message.systemProperties().messageId(), consumerGroupId);
return CompletableFuture.completedFuture(null);
}
if (deadLetterTopicId == topicId) {
LOGGER.error("Message: {} is dropped because the consumer group: {} has the same DLQ topic: {} with original topic",
flatMessageExt, consumerGroupId, topicId);
message.systemProperties().messageId(), consumerGroupId, topicId);
return CompletableFuture.completedFuture(null);
}
// get dlq topic info
Expand All @@ -98,17 +97,16 @@ public CompletableFuture<Void> send(TraceContext context, long consumerGroupId,
if (!(dlqTopic.getAcceptTypes().getTypesList().contains(MessageType.NORMAL)
|| dlqTopic.getAcceptTypes().getTypesList().contains(MessageType.FIFO))) {
LOGGER.error("Message: {} is dropped because the consumer group: {} has invalid DLQ topic: {}",
flatMessageExt, consumerGroupId, dlqTopic);
message.systemProperties().messageId(), consumerGroupId, dlqTopic);
return CompletableFuture.completedFuture(null);
}

FlatMessage message = flatMessageExt.message();
message.mutateTopicId(dlqTopic.getTopicId());

List<MessageQueueAssignment> assignmentsList = new ArrayList<>(dlqTopic.getAssignmentsList());
if (assignmentsList.isEmpty()) {
LOGGER.error("Message: {} is dropped because the consumer group: {} has empty DLQ topic: {}",
flatMessageExt, consumerGroupId, dlqTopic);
message.systemProperties().messageId(), consumerGroupId, dlqTopic);
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.MessageQueueAssignment;
import apache.rocketmq.controller.v1.StreamRole;
import apache.rocketmq.controller.v1.SubscriptionMode;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.proxy.v1.QueueStats;
import apache.rocketmq.proxy.v1.StreamStats;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.config.ProxyConfig;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.common.util.CommonUtil;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import com.automq.rocketmq.proxy.exception.ProxyException;
import com.automq.rocketmq.proxy.grpc.ProxyClient;
import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager;
import com.automq.rocketmq.proxy.model.ProxyContextExt;
import com.automq.rocketmq.proxy.model.VirtualQueue;
Expand Down Expand Up @@ -118,26 +121,30 @@

public class MessageServiceImpl implements MessageService, ExtendMessageService {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageServiceImpl.class);
private final BrokerConfig brokerConfig;
private final ProxyConfig config;
private final ProxyMetadataService metadataService;
private final MessageStore store;
private final LockService lockService;
private final DeadLetterSender deadLetterService;
private final SuspendRequestService suspendRequestService;
private final ProducerManager producerManager;
private final ProxyClient relayClient;
private final ExecutorService executorService = ThreadPoolMonitor.createAndMonitor(2, 5, 100, TimeUnit.SECONDS,
"Transaction-msg-check-thread", 2000);

public MessageServiceImpl(ProxyConfig config, MessageStore store, ProxyMetadataService metadataService,
LockService lockService, DeadLetterSender deadLetterService,
ProducerManager producerManager) throws StoreException {
this.config = config;
public MessageServiceImpl(BrokerConfig config, MessageStore store, ProxyMetadataService metadataService,
LockService lockService, DeadLetterSender deadLetterService, ProducerManager producerManager,
ProxyClient relayClient) throws StoreException {
this.brokerConfig = config;
this.config = config.proxy();
this.store = store;
this.metadataService = metadataService;
this.deadLetterService = deadLetterService;
this.lockService = lockService;
this.suspendRequestService = SuspendRequestService.getInstance();
this.producerManager = producerManager;
this.relayClient = relayClient;
store.registerTransactionCheckHandler(timerTag -> executorService.execute(() -> {
try {
checkTransactionStatus(timerTag);
Expand Down Expand Up @@ -187,20 +194,24 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx,
ProxyContextExt contextExt = (ProxyContextExt) ctx;
FlatMessage flatMessage = FlatMessageUtil.convertTo(contextExt, topic.getTopicId(), virtualQueue.physicalQueueId(), config.hostName(), message);

Optional<MessageQueueAssignment> optional = topic.getAssignmentsList().stream().filter(assignment -> assignment.getQueue().getQueueId() == flatMessage.queueId()).findFirst();
if (optional.isEmpty()) {
LOGGER.error("Message: {} is dropped because the topic: {} doesn't have queue: {}",
messageId, topic.getName(), flatMessage.queueId());
return CompletableFuture.failedFuture(new ProxyException(apache.rocketmq.v2.Code.BAD_REQUEST, "Queue " + flatMessage.queueId() + " is not assigned to any node."));
}
MessageQueueAssignment assignment = optional.get();

if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
flatMessage.systemProperties().mutateDeliveryAttempts(requestHeader.getReconsumeTimes() + 1);
if (requestHeader.getReconsumeTimes() > requestHeader.getMaxReconsumeTimes()) {
String groupName = requestHeader.getTopic().replace(MixAll.RETRY_GROUP_TOPIC_PREFIX, "");
FlatMessageExt flatMessageExt = FlatMessageExt.Builder.builder()
.message(flatMessage)
.offset(0)
.build();
contextExt.span().ifPresent(span -> {
span.setAttribute("deadLetter", true);
span.setAttribute("group", groupName);
});
return consumerGroupOf(groupName)
.thenCompose(group -> deadLetterService.send(contextExt, group.getGroupId(), flatMessageExt))
.thenCompose(group -> deadLetterService.send(contextExt, group.getGroupId(), flatMessage))
.thenApply(ignore -> new PutResult(PutResult.Status.PUT_OK, 0));
} else {
String groupName = requestHeader.getTopic().replace(MixAll.RETRY_GROUP_TOPIC_PREFIX, "");
Expand All @@ -210,10 +221,20 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx,
span.setAttribute("reconsumeTimes", requestHeader.getReconsumeTimes());
span.setAttribute("deliveryTimestamp", flatMessage.systemProperties().deliveryTimestamp());
});
if (assignment.getNodeId() != brokerConfig.nodeId()) {
return metadataService.addressOf(assignment.getNodeId())
.thenCompose(address -> relayClient.relayMessage(address, flatMessage))
.thenApply(status -> new PutResult(PutResult.Status.PUT_OK, 0));
}
return store.put(ContextUtil.buildStoreContext(ctx, topic.getName(), groupName), flatMessage);
}
}

if (assignment.getNodeId() != brokerConfig.nodeId()) {
return metadataService.addressOf(assignment.getNodeId())
.thenCompose(address -> relayClient.relayMessage(address, flatMessage))
.thenApply(status -> new PutResult(PutResult.Status.PUT_OK, 0));
}
return store.put(ContextUtil.buildStoreContext(ctx, topic.getName(), ""), flatMessage);
});

Expand Down Expand Up @@ -274,15 +295,16 @@ public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext ctx, Rece
throw new ProxyException(apache.rocketmq.v2.Code.MESSAGE_NOT_FOUND, "Message not found from server.");
}).thenCompose(messageExt -> {
if (messageExt.deliveryAttempts() > group.getMaxDeliveryAttempt()) {
return deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt);
return deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt.message());
}

// Message consume retry strategy
// <0: no retry,put into DLQ directly
// =0: broker control retry frequency
// >0: client control retry frequency
return switch (Integer.compare(delayLevel, 0)) {
case -1 -> deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt);
case -1 ->
deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt.message());
case 0 -> topicOf(MixAll.RETRY_GROUP_TOPIC_PREFIX + requestHeader.getGroup())
.thenCompose(retryTopic -> {
// Keep the same logic as apache RocketMQ.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void send_normal() {
return CompletableFuture.completedFuture(new PutResult(PutResult.Status.PUT_OK, 0));
}).when(messageStore).put(Mockito.any(), Mockito.any(FlatMessage.class));

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(1)).put(Mockito.any(), Mockito.any(FlatMessage.class));
}

Expand All @@ -132,7 +132,7 @@ public void send_group_not_allowed_dlq() {

FlatMessageExt msg = MockMessageUtil.buildMessage(TOPIC_ID, QUEUE_ID, "TAG_DLQ");

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class));

// 2. DLQ topic is configured but has no assignment
Expand All @@ -147,7 +147,7 @@ public void send_group_not_allowed_dlq() {
Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup))
.when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID);

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class));

// 3. DLQ topic is the same as original topic
Expand All @@ -163,7 +163,7 @@ public void send_group_not_allowed_dlq() {
Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup))
.when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID);

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class));

// 4. DLQ topic doesn't accept DLQ message
Expand All @@ -187,7 +187,7 @@ public void send_group_not_allowed_dlq() {
Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup))
.when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID);

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class));

// 5. DLQ topic not exist
Expand All @@ -203,7 +203,7 @@ public void send_group_not_allowed_dlq() {
Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup))
.when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID);

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package com.automq.rocketmq.proxy.service;

import apache.rocketmq.v2.Code;
import com.automq.rocketmq.common.config.ProxyConfig;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import com.automq.rocketmq.proxy.exception.ProxyException;
import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient;
import com.automq.rocketmq.proxy.mock.MockMessageStore;
import com.automq.rocketmq.proxy.mock.MockProxyMetadataService;
import com.automq.rocketmq.proxy.model.ProxyContextExt;
Expand Down Expand Up @@ -100,10 +101,10 @@ public static void setUpAll() throws Exception {
public void setUp() throws StoreException {
metadataService = new MockProxyMetadataService();
messageStore = new MockMessageStore();
ProxyConfig config = new ProxyConfig();
BrokerConfig config = new BrokerConfig();
deadLetterSender = Mockito.mock(DeadLetterSender.class);
Mockito.doReturn(CompletableFuture.completedFuture(null)).when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any());
messageService = new MessageServiceImpl(config, messageStore, metadataService, new LockService(config), deadLetterSender, new ProducerManager());
messageService = new MessageServiceImpl(config, messageStore, metadataService, new LockService(config.proxy()), deadLetterSender, new ProducerManager(), new GrpcProxyClient(config));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.store.api;

import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.common.trace.TraceContext;
import java.util.concurrent.CompletableFuture;

Expand All @@ -27,7 +27,7 @@ public interface DeadLetterSender {
*
* @param context trace context
* @param consumerGroupId consumer group id
* @param originalFlatMessage original message
* @param message original message
*/
CompletableFuture<Void> send(TraceContext context, long consumerGroupId, FlatMessageExt originalFlatMessage);
CompletableFuture<Void> send(TraceContext context, long consumerGroupId, FlatMessage message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void run() {
if (consumeTimes >= maxDeliveryAttempts) {
messageExt.setDeliveryAttempts(consumeTimes);
// Send to dead letter topic specified in consumer group config.
return deadLetterSender.send(context, consumerGroupId, messageExt)
return deadLetterSender.send(context, consumerGroupId, messageExt.message())
.thenApply(nil -> Pair.of(true, logicQueue));
}
return CompletableFuture.completedFuture(Pair.of(false, logicQueue));
Expand All @@ -203,7 +203,7 @@ public void run() {

if (messageExt.deliveryAttempts() >= maxDeliveryAttempts) {
// Send to dead letter topic specified in consumer group config.
return deadLetterSender.send(context, consumerGroupId, messageExt)
return deadLetterSender.send(context, consumerGroupId, messageExt.message())
.thenApply(nil -> Pair.of(true, logicQueue));
}
messageExt.setOriginalQueueOffset(messageExt.originalOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void setUp() throws Exception {
logicQueueManager = new DefaultLogicQueueManager(config, streamStore, kvService, timerService, metadataService, operationLogService, inflightService, streamReclaimService);
DeadLetterSender deadLetterSender = Mockito.mock(DeadLetterSender.class);
Mockito.doReturn(CompletableFuture.completedFuture(null))
.when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessageExt.class));
.when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessage.class));
MessageArrivalNotificationService messageArrivalNotificationService = new MessageArrivalNotificationService();
reviveService = new ReviveService(KV_NAMESPACE_CHECK_POINT, kvService, timerService, metadataService, messageArrivalNotificationService,
logicQueueManager, deadLetterSender);
Expand Down
Loading

0 comments on commit 7736c5d

Please sign in to comment.