Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proxy): add topicStats rpc #801

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions proto/src/main/proto/proxy/proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ syntax = "proto3";
package apache.rocketmq.proxy.v1;

import "common.proto";
import "controller/model.proto";

option java_multiple_files = true;
option java_package = "apache.rocketmq.proxy.v1";
Expand Down Expand Up @@ -45,8 +46,48 @@ message ResetConsumeOffsetByTimestampRequest {
int64 timestamp = 5;
}

message TopicStatsRequest {
ProxyRequestContext context = 1;
// Topic name, required
string topic = 2;
// Queue id, -1 means query all queue
int32 queue_id = 3;
// Consumer group name
string group = 4;
}

message StreamStats {
// Stream id
int64 stream_id = 1;
// Stream role
controller.v1.StreamRole role = 2;
// Min message offset
int64 min_offset = 3;
// Max message offset
int64 max_offset = 4;
// Consume offset, if consumer group name not specified, return -1
int64 consume_offset = 5;
}

message QueueStats {
// Queue id
int32 queue_id = 1;
// Stream stats which belong to this topic
repeated StreamStats stream_stats = 2;
}

message TopicStatsReply {
Status status = 1;
// Topic id
int64 id = 2;
// Topic name
string name = 3;
// Queue stats
repeated QueueStats queue_stats = 4;
}

service ProxyService {
rpc resetConsumeOffset(ResetConsumeOffsetRequest) returns (ResetConsumeOffsetReply) {}
rpc resetConsumeOffsetByTimestamp(ResetConsumeOffsetByTimestampRequest) returns (ResetConsumeOffsetReply) {}
rpc topicStats(TopicStatsRequest) returns (TopicStatsReply) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.proxy.v1.ProxyServiceGrpc;
import apache.rocketmq.proxy.v1.QueueStats;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetByTimestampRequest;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetReply;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest;
import apache.rocketmq.proxy.v1.Status;
import apache.rocketmq.proxy.v1.TopicStatsReply;
import apache.rocketmq.proxy.v1.TopicStatsRequest;
import com.automq.rocketmq.proxy.service.ExtendMessageService;
import com.google.protobuf.TextFormat;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.slf4j.Logger;

public class ProxyServiceImpl extends ProxyServiceGrpc.ProxyServiceImplBase {
Expand Down Expand Up @@ -78,4 +82,30 @@
responseObserver.onCompleted();
});
}

@Override
public void topicStats(TopicStatsRequest request, StreamObserver<TopicStatsReply> responseObserver) {
messageService.getTopicStats(request.getTopic(), request.getQueueId(), request.getGroup())
.whenComplete((pair, e) -> {

Check warning on line 89 in proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java#L88-L89

Added lines #L88 - L89 were not covered by tests
if (e != null) {
responseObserver.onError(e);
return;

Check warning on line 92 in proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java#L91-L92

Added lines #L91 - L92 were not covered by tests
}

Long topicId = pair.getLeft();
List<QueueStats> queueStatsList = pair.getRight();
TopicStatsReply reply = TopicStatsReply.newBuilder()
.setStatus(Status
.newBuilder()
.setCode(Code.OK)
.build())
.setId(topicId)
.setName(request.getTopic())
.addAllQueueStats(queueStatsList)
.build();

Check warning on line 105 in proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java#L95-L105

Added lines #L95 - L105 were not covered by tests

responseObserver.onNext(reply);
responseObserver.onCompleted();
});
}

Check warning on line 110 in proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java#L107-L110

Added lines #L107 - L110 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package com.automq.rocketmq.proxy.service;

import apache.rocketmq.proxy.v1.QueueStats;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;

public interface ExtendMessageService {

Expand All @@ -41,4 +44,13 @@ public interface ExtendMessageService {
*/
CompletableFuture<Void> resetConsumeOffsetByTimestamp(String topic, int queueId, String consumerGroup, long timestamp);

/**
* Get the stats of the given topic.
*
* @param topic The topic name.
* @param queueId The queue id of the queue.
* @param consumerGroup The consumer group.
* @return The stats of the given topic.
*/
CompletableFuture<Pair<Long, List<QueueStats>>> getTopicStats(String topic, int queueId, String consumerGroup);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.StreamRole;
import apache.rocketmq.controller.v1.SubscriptionMode;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.proxy.v1.QueueStats;
import apache.rocketmq.proxy.v1.StreamStats;
import com.automq.rocketmq.common.config.ProxyConfig;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.common.model.FlatMessageExt;
Expand All @@ -35,6 +38,7 @@
import com.automq.rocketmq.proxy.util.FlatMessageUtil;
import com.automq.rocketmq.proxy.util.ReceiptHandleUtil;
import com.automq.rocketmq.store.api.DeadLetterSender;
import com.automq.rocketmq.store.api.LogicQueue;
import com.automq.rocketmq.store.api.MessageStore;
import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.StoreContext;
Expand All @@ -57,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -628,7 +633,7 @@
pair.getRight().getTopicId(),
virtualQueue.physicalQueueId());
}
return store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId());
return CompletableFuture.completedFuture(store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId()));
});
}

Expand Down Expand Up @@ -782,15 +787,103 @@
@Override
public CompletableFuture<Long> getMaxOffset(ProxyContext ctx, AddressableMessageQueue messageQueue,
GetMaxOffsetRequestHeader requestHeader, long timeoutMillis) {
// TODO: Support in the next iteration
throw new UnsupportedOperationException();
String topic = requestHeader.getTopic();
int queueId = requestHeader.getQueueId();

Check warning on line 791 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L790-L791

Added lines #L790 - L791 were not covered by tests

return topicOf(topic).thenApply(topicMetadata -> {
Optional<LogicQueue.StreamOffsetRange> dataStreamRange = store.getOffsetRange(topicMetadata.getTopicId(), queueId, -1)
.stream()

Check warning on line 795 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L793-L795

Added lines #L793 - L795 were not covered by tests
.filter(offsetRange -> offsetRange.streamRole() == StreamRole.STREAM_ROLE_DATA)
.findFirst();

Check warning on line 797 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L797

Added line #L797 was not covered by tests
if (dataStreamRange.isEmpty()) {
throw new ProxyException(apache.rocketmq.v2.Code.BAD_REQUEST, "Topic is not opened in this node.");

Check warning on line 799 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L799

Added line #L799 was not covered by tests
}
return dataStreamRange.get().endOffset();

Check warning on line 801 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L801

Added line #L801 was not covered by tests
});
}

@Override
public CompletableFuture<Long> getMinOffset(ProxyContext ctx, AddressableMessageQueue messageQueue,
GetMinOffsetRequestHeader requestHeader, long timeoutMillis) {
// TODO: Support in the next iteration
throw new UnsupportedOperationException();
String topic = requestHeader.getTopic();
int queueId = requestHeader.getQueueId();

Check warning on line 809 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L808-L809

Added lines #L808 - L809 were not covered by tests

return topicOf(topic).thenApply(topicMetadata -> {
Optional<LogicQueue.StreamOffsetRange> dataStreamRange = store.getOffsetRange(topicMetadata.getTopicId(), queueId, -1)
.stream()

Check warning on line 813 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L811-L813

Added lines #L811 - L813 were not covered by tests
.filter(offsetRange -> offsetRange.streamRole() == StreamRole.STREAM_ROLE_DATA)
.findFirst();

Check warning on line 815 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L815

Added line #L815 was not covered by tests
if (dataStreamRange.isEmpty()) {
throw new ProxyException(apache.rocketmq.v2.Code.BAD_REQUEST, "Topic is not opened in this node.");

Check warning on line 817 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L817

Added line #L817 was not covered by tests
}
return dataStreamRange.get().startOffset();

Check warning on line 819 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L819

Added line #L819 was not covered by tests
});
}

private long getConsumerOffset(ConsumerGroup consumerGroup, Topic topic, int queueId, boolean retry) {
if (consumerGroup.getSubMode() == SubscriptionMode.SUB_MODE_PULL) {
if (retry) {
topicOf(MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup.getName())
.thenCompose(retryTopic -> metadataService.consumerOffsetOf(consumerGroup.getGroupId(), retryTopic.getTopicId(), queueId));
return metadataService.consumerOffsetOf(consumerGroup.getGroupId(), topic.getTopicId(), queueId).join();

Check warning on line 828 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L826-L828

Added lines #L826 - L828 were not covered by tests
}
return metadataService.consumerOffsetOf(consumerGroup.getGroupId(), topic.getTopicId(), queueId).join();

Check warning on line 830 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L830

Added line #L830 was not covered by tests
}

if (retry) {
return store.getRetryConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), queueId);

Check warning on line 834 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L834

Added line #L834 was not covered by tests
}
return store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), queueId);

Check warning on line 836 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L836

Added line #L836 was not covered by tests
}

private List<StreamStats> getStreamStats(Optional<ConsumerGroup> consumerGroup, Topic topic, int queueId) {
return store.getOffsetRange(topic.getTopicId(), queueId, consumerGroup.map(ConsumerGroup::getGroupId).orElse(-1L))
.stream().map(offsetRange -> {
long consumerOffset = 0;

Check warning on line 842 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L840-L842

Added lines #L840 - L842 were not covered by tests
if (consumerGroup.isPresent()) {
if (offsetRange.streamRole() == StreamRole.STREAM_ROLE_DATA) {
consumerOffset = getConsumerOffset(consumerGroup.get(), topic, queueId, false);

Check warning on line 845 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L845

Added line #L845 was not covered by tests
} else if (offsetRange.streamRole() == StreamRole.STREAM_ROLE_RETRY) {
consumerOffset = getConsumerOffset(consumerGroup.get(), topic, queueId, true);

Check warning on line 847 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L847

Added line #L847 was not covered by tests
}
}
return StreamStats.newBuilder()
.setStreamId(offsetRange.streamId())
.setMinOffset(offsetRange.startOffset())
.setMaxOffset(offsetRange.endOffset())
.setRole(offsetRange.streamRole())
.setConsumeOffset(consumerOffset)
.build();
}).toList();

Check warning on line 857 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L850-L857

Added lines #L850 - L857 were not covered by tests
}

@Override
public CompletableFuture<Pair<Long, List<QueueStats>>> getTopicStats(String topic, int queueId,
String consumerGroup) {
CompletableFuture<Optional<ConsumerGroup>> groupIdFuture;
if (StringUtils.isBlank(consumerGroup)) {
groupIdFuture = CompletableFuture.completedFuture(Optional.empty());
} else {
groupIdFuture = metadataService.consumerGroupOf(consumerGroup).thenApply(Optional::of);

Check warning on line 867 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L865-L867

Added lines #L865 - L867 were not covered by tests
}
return topicOf(topic)
.thenCombine(groupIdFuture, (topicMetadata, groupMetadata) -> {
long topicId = topicMetadata.getTopicId();
List<QueueStats> queueStatsList = new ArrayList<>();

Check warning on line 872 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L869-L872

Added lines #L869 - L872 were not covered by tests
if (queueId != -1) {
List<StreamStats> streamStatsList = getStreamStats(groupMetadata, topicMetadata, queueId);
queueStatsList.add(QueueStats.newBuilder().setQueueId(queueId).addAllStreamStats(streamStatsList).build());
} else {
int queueCount = topicMetadata.getCount();

Check warning on line 877 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L874-L877

Added lines #L874 - L877 were not covered by tests
for (int i = 0; i < queueCount; i++) {
List<StreamStats> streamStatsList = getStreamStats(groupMetadata, topicMetadata, i);

Check warning on line 879 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L879

Added line #L879 was not covered by tests
if (!streamStatsList.isEmpty()) {
queueStatsList.add(QueueStats.newBuilder().setQueueId(queueId).addAllStreamStats(streamStatsList).build());

Check warning on line 881 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L881

Added line #L881 was not covered by tests
}
}
}
return Pair.of(topicId, queueStatsList);

Check warning on line 885 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L885

Added line #L885 was not covered by tests
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.rocketmq.proxy.mock;

import apache.rocketmq.controller.v1.StreamRole;
import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.store.api.LogicQueue;
Expand Down Expand Up @@ -160,19 +161,24 @@ public CompletableFuture<Integer> getInflightStats(long consumerGroupId, long to
}

@Override
public CompletableFuture<LogicQueue.QueueOffsetRange> getOffsetRange(long topicId, int queueId) {
public List<LogicQueue.StreamOffsetRange> getOffsetRange(long topicId, int queueId, long consumerGroupId) {
long startOffset = 0;
List<FlatMessageExt> messageList = messageMap.computeIfAbsent(topicId + queueId, v -> new ArrayList<>());
if (!messageList.isEmpty()) {
startOffset = messageList.get(0).offset();
}
long endOffset = offsetMap.computeIfAbsent(topicId + queueId, v -> new AtomicLong()).get();
return CompletableFuture.completedFuture(new LogicQueue.QueueOffsetRange(startOffset, endOffset));
return List.of(new LogicQueue.StreamOffsetRange(0, StreamRole.STREAM_ROLE_DATA, startOffset, endOffset));
}

@Override
public CompletableFuture<Long> getConsumeOffset(long consumerGroupId, long topicId, int queueId) {
return CompletableFuture.completedFuture(consumerOffsetMap.getOrDefault(Pair.of(topicId, queueId), 0L));
public long getConsumeOffset(long consumerGroupId, long topicId, int queueId) {
return consumerOffsetMap.getOrDefault(Pair.of(topicId, queueId), 0L);
}

@Override
public long getRetryConsumeOffset(long consumerGroupId, long topicId, int queueId) {
return 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ void popMessage() {
assertEquals(PopStatus.FOUND, result.getPopStatus());
assertEquals(2, result.getMsgFoundList().size());
// All messages in queue 0 has been consumed
assertEquals(2, messageStore.getConsumeOffset(consumerGroupId, topicId, 0).join());
assertEquals(2, messageStore.getConsumeOffset(consumerGroupId, topicId, 0));

// Pop again.
result = messageService.popMessage(ProxyContextExt.create(), messageQueue, header, 0L).join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.List;
import java.util.Optional;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -258,15 +260,36 @@
}

@Override
public CompletableFuture<LogicQueue.QueueOffsetRange> getOffsetRange(long topicId, int queueId) {
return logicQueueManager.getOrCreate(StoreContext.EMPTY, topicId, queueId)
.thenCompose(LogicQueue::getOffsetRange);
public List<LogicQueue.StreamOffsetRange> getOffsetRange(long topicId, int queueId, long consumerGroupId) {
CompletableFuture<Optional<LogicQueue>> future = logicQueueManager.get(topicId, queueId);

Check warning on line 264 in store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java#L264

Added line #L264 was not covered by tests
if (future.isDone()) {
return future.join()
.map(topicQueue -> topicQueue.getOffsetRange(consumerGroupId))
.orElse(List.of());

Check warning on line 268 in store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java#L266-L268

Added lines #L266 - L268 were not covered by tests
}
return List.of();

Check warning on line 270 in store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java#L270

Added line #L270 was not covered by tests
}

@Override
public CompletableFuture<Long> getConsumeOffset(long consumerGroupId, long topicId, int queueId) {
return logicQueueManager.getOrCreate(StoreContext.EMPTY, topicId, queueId)
.thenApply(topicQueue -> topicQueue.getConsumeOffset(consumerGroupId));
public long getConsumeOffset(long consumerGroupId, long topicId, int queueId) {
CompletableFuture<Optional<LogicQueue>> future = logicQueueManager.get(topicId, queueId);
if (future.isDone()) {
return future.join()
.map(topicQueue -> topicQueue.getConsumeOffset(consumerGroupId))
.orElse(0L);
}
return 0;

Check warning on line 281 in store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java#L281

Added line #L281 was not covered by tests
}

@Override
public long getRetryConsumeOffset(long consumerGroupId, long topicId, int queueId) {
CompletableFuture<Optional<LogicQueue>> future = logicQueueManager.get(topicId, queueId);

Check warning on line 286 in store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java#L286

Added line #L286 was not covered by tests
if (future.isDone()) {
return future.join()
.map(topicQueue -> topicQueue.getRetryConsumeOffset(consumerGroupId))
.orElse(0L);

Check warning on line 290 in store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java#L288-L290

Added lines #L288 - L290 were not covered by tests
}
return 0;

Check warning on line 292 in store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java#L292

Added line #L292 was not covered by tests
}

@Override
Expand Down
Loading
Loading