From 20b67d98d7764ab08ff5226743d4e19b027c7591 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Wed, 6 Dec 2023 11:57:18 +0800 Subject: [PATCH] feat(proxy): add topicStats rpc Signed-off-by: SSpirits --- proto/src/main/proto/proxy/proxy.proto | 41 +++++++ .../rocketmq/proxy/grpc/ProxyServiceImpl.java | 32 +++++- .../proxy/service/ExtendMessageService.java | 12 ++ .../proxy/service/MessageServiceImpl.java | 103 +++++++++++++++++- .../rocketmq/proxy/mock/MockMessageStore.java | 14 ++- .../proxy/service/MessageServiceImplTest.java | 2 +- .../rocketmq/store/MessageStoreImpl.java | 35 +++++- .../automq/rocketmq/store/api/LogicQueue.java | 6 +- .../rocketmq/store/api/MessageStore.java | 22 +++- .../store/queue/StreamLogicQueue.java | 37 ++++++- .../rocketmq/store/MessageStoreTest.java | 12 +- 11 files changed, 284 insertions(+), 32 deletions(-) diff --git a/proto/src/main/proto/proxy/proxy.proto b/proto/src/main/proto/proxy/proxy.proto index b8eea8507..bbb77ad93 100644 --- a/proto/src/main/proto/proxy/proxy.proto +++ b/proto/src/main/proto/proxy/proxy.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package apache.rocketmq.proxy.v1; import "common.proto"; +import "controller/model.proto"; option java_multiple_files = true; option java_package = "apache.rocketmq.proxy.v1"; @@ -45,8 +46,48 @@ message ResetConsumeOffsetByTimestampRequest { int64 timestamp = 5; } +message TopicStatsRequest { + ProxyRequestContext context = 1; + // Topic name, required + string topic = 2; + // Queue id, -1 means query all queue + int32 queue_id = 3; + // Consumer group name + string group = 4; +} + +message StreamStats { + // Stream id + int64 stream_id = 1; + // Stream role + controller.v1.StreamRole role = 2; + // Min message offset + int64 min_offset = 3; + // Max message offset + int64 max_offset = 4; + // Consume offset, if consumer group name not specified, return -1 + int64 consume_offset = 5; +} + +message QueueStats { + // Queue id + int32 queue_id = 1; + // Stream stats which belong to this topic + repeated StreamStats stream_stats = 2; +} + +message TopicStatsReply { + Status status = 1; + // Topic id + int64 id = 2; + // Topic name + string name = 3; + // Queue stats + repeated QueueStats queue_stats = 4; +} service ProxyService { rpc resetConsumeOffset(ResetConsumeOffsetRequest) returns (ResetConsumeOffsetReply) {} rpc resetConsumeOffsetByTimestamp(ResetConsumeOffsetByTimestampRequest) returns (ResetConsumeOffsetReply) {} + rpc topicStats(TopicStatsRequest) returns (TopicStatsReply) {} } \ No newline at end of file diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java index 215a1c400..425bec3d8 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java @@ -19,13 +19,17 @@ import apache.rocketmq.common.v1.Code; import apache.rocketmq.proxy.v1.ProxyServiceGrpc; +import apache.rocketmq.proxy.v1.QueueStats; import apache.rocketmq.proxy.v1.ResetConsumeOffsetByTimestampRequest; -import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest; import apache.rocketmq.proxy.v1.ResetConsumeOffsetReply; +import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest; import apache.rocketmq.proxy.v1.Status; +import apache.rocketmq.proxy.v1.TopicStatsReply; +import apache.rocketmq.proxy.v1.TopicStatsRequest; import com.automq.rocketmq.proxy.service.ExtendMessageService; import com.google.protobuf.TextFormat; import io.grpc.stub.StreamObserver; +import java.util.List; import org.slf4j.Logger; public class ProxyServiceImpl extends ProxyServiceGrpc.ProxyServiceImplBase { @@ -78,4 +82,30 @@ public void resetConsumeOffsetByTimestamp(ResetConsumeOffsetByTimestampRequest r responseObserver.onCompleted(); }); } + + @Override + public void topicStats(TopicStatsRequest request, StreamObserver responseObserver) { + messageService.getTopicStats(request.getTopic(), request.getQueueId(), request.getGroup()) + .whenComplete((pair, e) -> { + if (e != null) { + responseObserver.onError(e); + return; + } + + Long topicId = pair.getLeft(); + List queueStatsList = pair.getRight(); + TopicStatsReply reply = TopicStatsReply.newBuilder() + .setStatus(Status + .newBuilder() + .setCode(Code.OK) + .build()) + .setId(topicId) + .setName(request.getTopic()) + .addAllQueueStats(queueStatsList) + .build(); + + responseObserver.onNext(reply); + responseObserver.onCompleted(); + }); + } } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/ExtendMessageService.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/ExtendMessageService.java index 5830bc13c..bd361560f 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/ExtendMessageService.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/ExtendMessageService.java @@ -17,7 +17,10 @@ package com.automq.rocketmq.proxy.service; +import apache.rocketmq.proxy.v1.QueueStats; +import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.tuple.Pair; public interface ExtendMessageService { @@ -41,4 +44,13 @@ public interface ExtendMessageService { */ CompletableFuture resetConsumeOffsetByTimestamp(String topic, int queueId, String consumerGroup, long timestamp); + /** + * Get the stats of the given topic. + * + * @param topic The topic name. + * @param queueId The queue id of the queue. + * @param consumerGroup The consumer group. + * @return The stats of the given topic. + */ + CompletableFuture>> getTopicStats(String topic, int queueId, String consumerGroup); } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java index 419c9c86e..20d303b05 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java @@ -19,8 +19,11 @@ import apache.rocketmq.common.v1.Code; import apache.rocketmq.controller.v1.ConsumerGroup; +import apache.rocketmq.controller.v1.StreamRole; import apache.rocketmq.controller.v1.SubscriptionMode; import apache.rocketmq.controller.v1.Topic; +import apache.rocketmq.proxy.v1.QueueStats; +import apache.rocketmq.proxy.v1.StreamStats; import com.automq.rocketmq.common.config.ProxyConfig; import com.automq.rocketmq.common.exception.ControllerException; import com.automq.rocketmq.common.model.FlatMessageExt; @@ -35,6 +38,7 @@ import com.automq.rocketmq.proxy.util.FlatMessageUtil; import com.automq.rocketmq.proxy.util.ReceiptHandleUtil; import com.automq.rocketmq.store.api.DeadLetterSender; +import com.automq.rocketmq.store.api.LogicQueue; import com.automq.rocketmq.store.api.MessageStore; import com.automq.rocketmq.store.exception.StoreException; import com.automq.rocketmq.store.model.StoreContext; @@ -53,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -561,7 +566,7 @@ public CompletableFuture queryConsumerOffset(ProxyContext ctx, Addressable pair.getRight().getTopicId(), virtualQueue.physicalQueueId()); } - return store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId()); + return CompletableFuture.completedFuture(store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId())); }); } @@ -715,15 +720,103 @@ public CompletableFuture unlockBatchMQ(ProxyContext ctx, AddressableMessag @Override public CompletableFuture getMaxOffset(ProxyContext ctx, AddressableMessageQueue messageQueue, GetMaxOffsetRequestHeader requestHeader, long timeoutMillis) { - // TODO: Support in the next iteration - throw new UnsupportedOperationException(); + String topic = requestHeader.getTopic(); + int queueId = requestHeader.getQueueId(); + + return topicOf(topic).thenApply(topicMetadata -> { + Optional dataStreamRange = store.getOffsetRange(topicMetadata.getTopicId(), queueId, -1) + .stream() + .filter(offsetRange -> offsetRange.streamRole() == StreamRole.STREAM_ROLE_DATA) + .findFirst(); + if (dataStreamRange.isEmpty()) { + throw new ProxyException(apache.rocketmq.v2.Code.BAD_REQUEST, "Topic is not opened in this node."); + } + return dataStreamRange.get().endOffset(); + }); } @Override public CompletableFuture getMinOffset(ProxyContext ctx, AddressableMessageQueue messageQueue, GetMinOffsetRequestHeader requestHeader, long timeoutMillis) { - // TODO: Support in the next iteration - throw new UnsupportedOperationException(); + String topic = requestHeader.getTopic(); + int queueId = requestHeader.getQueueId(); + + return topicOf(topic).thenApply(topicMetadata -> { + Optional dataStreamRange = store.getOffsetRange(topicMetadata.getTopicId(), queueId, -1) + .stream() + .filter(offsetRange -> offsetRange.streamRole() == StreamRole.STREAM_ROLE_DATA) + .findFirst(); + if (dataStreamRange.isEmpty()) { + throw new ProxyException(apache.rocketmq.v2.Code.BAD_REQUEST, "Topic is not opened in this node."); + } + return dataStreamRange.get().startOffset(); + }); + } + + private long getConsumerOffset(ConsumerGroup consumerGroup, Topic topic, int queueId, boolean retry) { + if (consumerGroup.getSubMode() == SubscriptionMode.SUB_MODE_PULL) { + if (retry) { + topicOf(MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup.getName()) + .thenCompose(retryTopic -> metadataService.consumerOffsetOf(consumerGroup.getGroupId(), retryTopic.getTopicId(), queueId)); + return metadataService.consumerOffsetOf(consumerGroup.getGroupId(), topic.getTopicId(), queueId).join(); + } + return metadataService.consumerOffsetOf(consumerGroup.getGroupId(), topic.getTopicId(), queueId).join(); + } + + if (retry) { + return store.getRetryConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), queueId); + } + return store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), queueId); + } + + private List getStreamStats(Optional consumerGroup, Topic topic, int queueId) { + return store.getOffsetRange(topic.getTopicId(), queueId, consumerGroup.map(ConsumerGroup::getGroupId).orElse(-1L)) + .stream().map(offsetRange -> { + long consumerOffset = 0; + if (consumerGroup.isPresent()) { + if (offsetRange.streamRole() == StreamRole.STREAM_ROLE_DATA) { + consumerOffset = getConsumerOffset(consumerGroup.get(), topic, queueId, false); + } else if (offsetRange.streamRole() == StreamRole.STREAM_ROLE_RETRY) { + consumerOffset = getConsumerOffset(consumerGroup.get(), topic, queueId, true); + } + } + return StreamStats.newBuilder() + .setStreamId(offsetRange.streamId()) + .setMinOffset(offsetRange.startOffset()) + .setMaxOffset(offsetRange.endOffset()) + .setRole(offsetRange.streamRole()) + .setConsumeOffset(consumerOffset) + .build(); + }).toList(); + } + + @Override + public CompletableFuture>> getTopicStats(String topic, int queueId, + String consumerGroup) { + CompletableFuture> groupIdFuture; + if (StringUtils.isBlank(consumerGroup)) { + groupIdFuture = CompletableFuture.completedFuture(Optional.empty()); + } else { + groupIdFuture = metadataService.consumerGroupOf(consumerGroup).thenApply(Optional::of); + } + return topicOf(topic) + .thenCombine(groupIdFuture, (topicMetadata, groupMetadata) -> { + long topicId = topicMetadata.getTopicId(); + List queueStatsList = new ArrayList<>(); + if (queueId != -1) { + List streamStatsList = getStreamStats(groupMetadata, topicMetadata, queueId); + queueStatsList.add(QueueStats.newBuilder().setQueueId(queueId).addAllStreamStats(streamStatsList).build()); + } else { + int queueCount = topicMetadata.getCount(); + for (int i = 0; i < queueCount; i++) { + List streamStatsList = getStreamStats(groupMetadata, topicMetadata, i); + if (!streamStatsList.isEmpty()) { + queueStatsList.add(QueueStats.newBuilder().setQueueId(queueId).addAllStreamStats(streamStatsList).build()); + } + } + } + return Pair.of(topicId, queueStatsList); + }); } @Override diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java b/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java index d7b107bbf..d86ab0ad6 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java @@ -17,6 +17,7 @@ package com.automq.rocketmq.proxy.mock; +import apache.rocketmq.controller.v1.StreamRole; import com.automq.rocketmq.common.model.FlatMessageExt; import com.automq.rocketmq.common.model.generated.FlatMessage; import com.automq.rocketmq.store.api.LogicQueue; @@ -159,19 +160,24 @@ public CompletableFuture getInflightStats(long consumerGroupId, long to } @Override - public CompletableFuture getOffsetRange(long topicId, int queueId) { + public List getOffsetRange(long topicId, int queueId, long consumerGroupId) { long startOffset = 0; List messageList = messageMap.computeIfAbsent(topicId + queueId, v -> new ArrayList<>()); if (!messageList.isEmpty()) { startOffset = messageList.get(0).offset(); } long endOffset = offsetMap.computeIfAbsent(topicId + queueId, v -> new AtomicLong()).get(); - return CompletableFuture.completedFuture(new LogicQueue.QueueOffsetRange(startOffset, endOffset)); + return List.of(new LogicQueue.StreamOffsetRange(0, StreamRole.STREAM_ROLE_DATA, startOffset, endOffset)); } @Override - public CompletableFuture getConsumeOffset(long consumerGroupId, long topicId, int queueId) { - return CompletableFuture.completedFuture(consumerOffsetMap.getOrDefault(Pair.of(topicId, queueId), 0L)); + public long getConsumeOffset(long consumerGroupId, long topicId, int queueId) { + return consumerOffsetMap.getOrDefault(Pair.of(topicId, queueId), 0L); + } + + @Override + public long getRetryConsumeOffset(long consumerGroupId, long topicId, int queueId) { + return 0; } @Override diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java b/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java index 15760773d..994c821a1 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java @@ -355,7 +355,7 @@ void popMessage() { assertEquals(PopStatus.FOUND, result.getPopStatus()); assertEquals(2, result.getMsgFoundList().size()); // All messages in queue 0 has been consumed - assertEquals(2, messageStore.getConsumeOffset(consumerGroupId, topicId, 0).join()); + assertEquals(2, messageStore.getConsumeOffset(consumerGroupId, topicId, 0)); // Pop again. result = messageService.popMessage(ProxyContextExt.create(), messageQueue, header, 0L).join(); diff --git a/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java b/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java index 044659f89..9204ca4e4 100644 --- a/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java +++ b/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java @@ -50,6 +50,8 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.annotations.SpanAttribute; import io.opentelemetry.instrumentation.annotations.WithSpan; +import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -255,15 +257,36 @@ public CompletableFuture getInflightStats(long consumerGroupId, long to } @Override - public CompletableFuture getOffsetRange(long topicId, int queueId) { - return logicQueueManager.getOrCreate(StoreContext.EMPTY, topicId, queueId) - .thenCompose(LogicQueue::getOffsetRange); + public List getOffsetRange(long topicId, int queueId, long consumerGroupId) { + CompletableFuture> future = logicQueueManager.get(topicId, queueId); + if (future.isDone()) { + return future.join() + .map(topicQueue -> topicQueue.getOffsetRange(consumerGroupId)) + .orElse(List.of()); + } + return List.of(); } @Override - public CompletableFuture getConsumeOffset(long consumerGroupId, long topicId, int queueId) { - return logicQueueManager.getOrCreate(StoreContext.EMPTY, topicId, queueId) - .thenApply(topicQueue -> topicQueue.getConsumeOffset(consumerGroupId)); + public long getConsumeOffset(long consumerGroupId, long topicId, int queueId) { + CompletableFuture> future = logicQueueManager.get(topicId, queueId); + if (future.isDone()) { + return future.join() + .map(topicQueue -> topicQueue.getConsumeOffset(consumerGroupId)) + .orElse(0L); + } + return 0; + } + + @Override + public long getRetryConsumeOffset(long consumerGroupId, long topicId, int queueId) { + CompletableFuture> future = logicQueueManager.get(topicId, queueId); + if (future.isDone()) { + return future.join() + .map(topicQueue -> topicQueue.getRetryConsumeOffset(consumerGroupId)) + .orElse(0L); + } + return 0; } @Override diff --git a/store/src/main/java/com/automq/rocketmq/store/api/LogicQueue.java b/store/src/main/java/com/automq/rocketmq/store/api/LogicQueue.java index 9ee0cc9f5..fbf66b6b2 100644 --- a/store/src/main/java/com/automq/rocketmq/store/api/LogicQueue.java +++ b/store/src/main/java/com/automq/rocketmq/store/api/LogicQueue.java @@ -17,6 +17,7 @@ package com.automq.rocketmq.store.api; +import apache.rocketmq.controller.v1.StreamRole; import com.automq.rocketmq.common.model.generated.FlatMessage; import com.automq.rocketmq.store.model.StoreContext; import com.automq.rocketmq.store.model.message.AckResult; @@ -26,6 +27,7 @@ import com.automq.rocketmq.store.model.message.PullResult; import com.automq.rocketmq.store.model.message.PutResult; import com.automq.rocketmq.store.model.message.ResetConsumeOffsetResult; +import java.util.List; import java.util.concurrent.CompletableFuture; public abstract class LogicQueue { @@ -72,7 +74,7 @@ public abstract CompletableFuture changeInvisible public abstract CompletableFuture resetConsumeOffset(long consumerGroupId, long offset); - public abstract CompletableFuture getOffsetRange(); + public abstract List getOffsetRange(long consumerGroupId); public abstract int getInflightStats(long consumerGroupId); @@ -102,6 +104,6 @@ public enum State { CLOSED } - public record QueueOffsetRange(long startOffset, long endOffset) { + public record StreamOffsetRange(long streamId, StreamRole streamRole, long startOffset, long endOffset) { } } diff --git a/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java b/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java index 4e4098df6..45ac0a1ce 100644 --- a/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java +++ b/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java @@ -30,6 +30,7 @@ import com.automq.rocketmq.store.model.message.PullResult; import com.automq.rocketmq.store.model.message.PutResult; import com.automq.rocketmq.store.model.message.ResetConsumeOffsetResult; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -108,11 +109,12 @@ CompletableFuture changeInvisibleDuration(String /** * Get offset range in queue. * - * @param topicId topic id - * @param queueId queue id - * @return offset range, [startOffset, endOffset) + * @param topicId topic id + * @param queueId queue id + * @param consumerGroupId consumer group id + * @return offset range, [startOffset, endOffset) for streams */ - CompletableFuture getOffsetRange(long topicId, int queueId); + List getOffsetRange(long topicId, int queueId, long consumerGroupId); /** * Get consume offset of specified consumer group. @@ -122,7 +124,17 @@ CompletableFuture changeInvisibleDuration(String * @param queueId queue id * @return consume offset */ - CompletableFuture getConsumeOffset(long consumerGroupId, long topicId, int queueId); + long getConsumeOffset(long consumerGroupId, long topicId, int queueId); + + /** + * Get retry offset of specified consumer group. + * + * @param consumerGroupId consumer group id + * @param topicId topic id + * @param queueId queue id + * @return ack offset + */ + long getRetryConsumeOffset(long consumerGroupId, long topicId, int queueId); /** * Reset consume offset of specified consumer group. diff --git a/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java b/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java index da6344a80..58d383c9c 100644 --- a/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java +++ b/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java @@ -17,6 +17,7 @@ package com.automq.rocketmq.store.queue; +import apache.rocketmq.controller.v1.StreamRole; import com.automq.rocketmq.common.config.StoreConfig; import com.automq.rocketmq.common.model.FlatMessageExt; import com.automq.rocketmq.common.model.generated.FlatMessage; @@ -569,8 +570,40 @@ public CompletableFuture resetConsumeOffset(long consu } @Override - public CompletableFuture getOffsetRange() { - throw new UnsupportedOperationException(); + public List getOffsetRange(long consumerGroupId) { + if (state.get() != LogicQueue.State.OPENED) { + return List.of(); + } + + List rangeList = new ArrayList<>(); + + StreamOffsetRange dataRange = new StreamOffsetRange(dataStreamId, StreamRole.STREAM_ROLE_DATA, + streamStore.startOffset(dataStreamId), streamStore.confirmOffset(dataStreamId)); + rangeList.add(dataRange); + + StreamOffsetRange operationRange = new StreamOffsetRange(operationStreamId, StreamRole.STREAM_ROLE_OPS, + streamStore.startOffset(operationStreamId), streamStore.confirmOffset(operationStreamId)); + rangeList.add(operationRange); + + StreamOffsetRange snapshotRange = new StreamOffsetRange(snapshotStreamId, StreamRole.STREAM_ROLE_SNAPSHOT, + streamStore.startOffset(snapshotStreamId), streamStore.confirmOffset(snapshotStreamId)); + rangeList.add(snapshotRange); + + if (consumerGroupId > 0) { + CompletableFuture retryStreamIdCf = retryStreamIdMap.get(consumerGroupId); + if (retryStreamIdCf != null) { + try { + long retryStreamId = retryStreamIdCf.get(); + StreamOffsetRange retryRange = new StreamOffsetRange(retryStreamId, StreamRole.STREAM_ROLE_RETRY, + streamStore.startOffset(retryStreamId), streamStore.confirmOffset(retryStreamId)); + rangeList.add(retryRange); + } catch (Exception e) { + LOGGER.error("Failed to get retry stream id for consumer group: {}", consumerGroupId, e); + } + } + } + + return rangeList; } @Override diff --git a/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java b/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java index 3c152bcc3..ac9c540f2 100644 --- a/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java @@ -494,8 +494,8 @@ public void recover_all_operation() throws StoreException { ResetConsumeOffsetResult resetConsumeOffsetResult = logicQueue.resetConsumeOffset(CONSUMER_GROUP_ID, 0).join(); assertEquals(ResetConsumeOffsetResult.Status.SUCCESS, resetConsumeOffsetResult.status()); - Long offset = messageStore.getConsumeOffset(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID).join(); - assertEquals(0, offset.longValue()); + long offset = messageStore.getConsumeOffset(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID); + assertEquals(0, offset); // 7. pop 1 message with long invisible duration popResult = messageStore.pop(StoreContext.EMPTY, CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 1, false, false, Long.MAX_VALUE).join(); @@ -513,8 +513,8 @@ public void recover_all_operation() throws StoreException { long nextVisibleTimestamp = System.currentTimeMillis() + 800; // check consume offset and retry message count before closing - offset = messageStore.getConsumeOffset(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID).join(); - assertEquals(2, offset.longValue()); + offset = messageStore.getConsumeOffset(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID); + assertEquals(2, offset); StreamMetadata retryStream = metadataService.retryStreamOf(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID).join(); assertFalse(streamStore.isOpened(retryStream.getStreamId())); @@ -543,8 +543,8 @@ public void recover_all_operation() throws StoreException { assertEquals(2, timerTagCount.get()); // check consume offset and retry message count after opening - offset = messageStore.getConsumeOffset(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID).join(); - assertEquals(2, offset.longValue()); + offset = messageStore.getConsumeOffset(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID); + assertEquals(2, offset); assertFalse(streamStore.isOpened(retryStream.getStreamId())); await().atMost(Duration.ofSeconds(5))