Skip to content

Commit

Permalink
feat(proxy): add topicStats rpc
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <admin@lv5.moe>
  • Loading branch information
ShadowySpirits committed Dec 6, 2023
1 parent 3c47c31 commit 20b67d9
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 32 deletions.
41 changes: 41 additions & 0 deletions proto/src/main/proto/proxy/proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ syntax = "proto3";
package apache.rocketmq.proxy.v1;

import "common.proto";
import "controller/model.proto";

option java_multiple_files = true;
option java_package = "apache.rocketmq.proxy.v1";
Expand Down Expand Up @@ -45,8 +46,48 @@ message ResetConsumeOffsetByTimestampRequest {
int64 timestamp = 5;
}

message TopicStatsRequest {
ProxyRequestContext context = 1;
// Topic name, required
string topic = 2;
// Queue id, -1 means query all queue
int32 queue_id = 3;
// Consumer group name
string group = 4;
}

message StreamStats {
// Stream id
int64 stream_id = 1;
// Stream role
controller.v1.StreamRole role = 2;
// Min message offset
int64 min_offset = 3;
// Max message offset
int64 max_offset = 4;
// Consume offset, if consumer group name not specified, return -1
int64 consume_offset = 5;
}

message QueueStats {
// Queue id
int32 queue_id = 1;
// Stream stats which belong to this topic
repeated StreamStats stream_stats = 2;
}

message TopicStatsReply {
Status status = 1;
// Topic id
int64 id = 2;
// Topic name
string name = 3;
// Queue stats
repeated QueueStats queue_stats = 4;
}

service ProxyService {
rpc resetConsumeOffset(ResetConsumeOffsetRequest) returns (ResetConsumeOffsetReply) {}
rpc resetConsumeOffsetByTimestamp(ResetConsumeOffsetByTimestampRequest) returns (ResetConsumeOffsetReply) {}
rpc topicStats(TopicStatsRequest) returns (TopicStatsReply) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.proxy.v1.ProxyServiceGrpc;
import apache.rocketmq.proxy.v1.QueueStats;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetByTimestampRequest;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetReply;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest;
import apache.rocketmq.proxy.v1.Status;
import apache.rocketmq.proxy.v1.TopicStatsReply;
import apache.rocketmq.proxy.v1.TopicStatsRequest;
import com.automq.rocketmq.proxy.service.ExtendMessageService;
import com.google.protobuf.TextFormat;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.slf4j.Logger;

public class ProxyServiceImpl extends ProxyServiceGrpc.ProxyServiceImplBase {
Expand Down Expand Up @@ -78,4 +82,30 @@ public void resetConsumeOffsetByTimestamp(ResetConsumeOffsetByTimestampRequest r
responseObserver.onCompleted();
});
}

@Override
public void topicStats(TopicStatsRequest request, StreamObserver<TopicStatsReply> responseObserver) {
messageService.getTopicStats(request.getTopic(), request.getQueueId(), request.getGroup())
.whenComplete((pair, e) -> {
if (e != null) {
responseObserver.onError(e);
return;
}

Long topicId = pair.getLeft();
List<QueueStats> queueStatsList = pair.getRight();
TopicStatsReply reply = TopicStatsReply.newBuilder()
.setStatus(Status
.newBuilder()
.setCode(Code.OK)
.build())
.setId(topicId)
.setName(request.getTopic())
.addAllQueueStats(queueStatsList)
.build();

responseObserver.onNext(reply);
responseObserver.onCompleted();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package com.automq.rocketmq.proxy.service;

import apache.rocketmq.proxy.v1.QueueStats;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;

public interface ExtendMessageService {

Expand All @@ -41,4 +44,13 @@ public interface ExtendMessageService {
*/
CompletableFuture<Void> resetConsumeOffsetByTimestamp(String topic, int queueId, String consumerGroup, long timestamp);

/**
* Get the stats of the given topic.
*
* @param topic The topic name.
* @param queueId The queue id of the queue.
* @param consumerGroup The consumer group.
* @return The stats of the given topic.
*/
CompletableFuture<Pair<Long, List<QueueStats>>> getTopicStats(String topic, int queueId, String consumerGroup);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.StreamRole;
import apache.rocketmq.controller.v1.SubscriptionMode;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.proxy.v1.QueueStats;
import apache.rocketmq.proxy.v1.StreamStats;
import com.automq.rocketmq.common.config.ProxyConfig;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.common.model.FlatMessageExt;
Expand All @@ -35,6 +38,7 @@
import com.automq.rocketmq.proxy.util.FlatMessageUtil;
import com.automq.rocketmq.proxy.util.ReceiptHandleUtil;
import com.automq.rocketmq.store.api.DeadLetterSender;
import com.automq.rocketmq.store.api.LogicQueue;
import com.automq.rocketmq.store.api.MessageStore;
import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.StoreContext;
Expand All @@ -53,6 +57,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -561,7 +566,7 @@ public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx, Addressable
pair.getRight().getTopicId(),
virtualQueue.physicalQueueId());
}
return store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId());
return CompletableFuture.completedFuture(store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId()));
});
}

Expand Down Expand Up @@ -715,15 +720,103 @@ public CompletableFuture<Void> unlockBatchMQ(ProxyContext ctx, AddressableMessag
@Override
public CompletableFuture<Long> getMaxOffset(ProxyContext ctx, AddressableMessageQueue messageQueue,
GetMaxOffsetRequestHeader requestHeader, long timeoutMillis) {
// TODO: Support in the next iteration
throw new UnsupportedOperationException();
String topic = requestHeader.getTopic();
int queueId = requestHeader.getQueueId();

return topicOf(topic).thenApply(topicMetadata -> {
Optional<LogicQueue.StreamOffsetRange> dataStreamRange = store.getOffsetRange(topicMetadata.getTopicId(), queueId, -1)
.stream()
.filter(offsetRange -> offsetRange.streamRole() == StreamRole.STREAM_ROLE_DATA)
.findFirst();
if (dataStreamRange.isEmpty()) {
throw new ProxyException(apache.rocketmq.v2.Code.BAD_REQUEST, "Topic is not opened in this node.");
}
return dataStreamRange.get().endOffset();
});
}

@Override
public CompletableFuture<Long> getMinOffset(ProxyContext ctx, AddressableMessageQueue messageQueue,
GetMinOffsetRequestHeader requestHeader, long timeoutMillis) {
// TODO: Support in the next iteration
throw new UnsupportedOperationException();
String topic = requestHeader.getTopic();
int queueId = requestHeader.getQueueId();

return topicOf(topic).thenApply(topicMetadata -> {
Optional<LogicQueue.StreamOffsetRange> dataStreamRange = store.getOffsetRange(topicMetadata.getTopicId(), queueId, -1)
.stream()
.filter(offsetRange -> offsetRange.streamRole() == StreamRole.STREAM_ROLE_DATA)
.findFirst();
if (dataStreamRange.isEmpty()) {
throw new ProxyException(apache.rocketmq.v2.Code.BAD_REQUEST, "Topic is not opened in this node.");
}
return dataStreamRange.get().startOffset();
});
}

private long getConsumerOffset(ConsumerGroup consumerGroup, Topic topic, int queueId, boolean retry) {
if (consumerGroup.getSubMode() == SubscriptionMode.SUB_MODE_PULL) {
if (retry) {
topicOf(MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup.getName())
.thenCompose(retryTopic -> metadataService.consumerOffsetOf(consumerGroup.getGroupId(), retryTopic.getTopicId(), queueId));
return metadataService.consumerOffsetOf(consumerGroup.getGroupId(), topic.getTopicId(), queueId).join();
}
return metadataService.consumerOffsetOf(consumerGroup.getGroupId(), topic.getTopicId(), queueId).join();
}

if (retry) {
return store.getRetryConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), queueId);
}
return store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), queueId);
}

private List<StreamStats> getStreamStats(Optional<ConsumerGroup> consumerGroup, Topic topic, int queueId) {
return store.getOffsetRange(topic.getTopicId(), queueId, consumerGroup.map(ConsumerGroup::getGroupId).orElse(-1L))
.stream().map(offsetRange -> {
long consumerOffset = 0;
if (consumerGroup.isPresent()) {
if (offsetRange.streamRole() == StreamRole.STREAM_ROLE_DATA) {
consumerOffset = getConsumerOffset(consumerGroup.get(), topic, queueId, false);
} else if (offsetRange.streamRole() == StreamRole.STREAM_ROLE_RETRY) {
consumerOffset = getConsumerOffset(consumerGroup.get(), topic, queueId, true);
}
}
return StreamStats.newBuilder()
.setStreamId(offsetRange.streamId())
.setMinOffset(offsetRange.startOffset())
.setMaxOffset(offsetRange.endOffset())
.setRole(offsetRange.streamRole())
.setConsumeOffset(consumerOffset)
.build();
}).toList();
}

@Override
public CompletableFuture<Pair<Long, List<QueueStats>>> getTopicStats(String topic, int queueId,
String consumerGroup) {
CompletableFuture<Optional<ConsumerGroup>> groupIdFuture;
if (StringUtils.isBlank(consumerGroup)) {
groupIdFuture = CompletableFuture.completedFuture(Optional.empty());
} else {
groupIdFuture = metadataService.consumerGroupOf(consumerGroup).thenApply(Optional::of);
}
return topicOf(topic)
.thenCombine(groupIdFuture, (topicMetadata, groupMetadata) -> {
long topicId = topicMetadata.getTopicId();
List<QueueStats> queueStatsList = new ArrayList<>();
if (queueId != -1) {
List<StreamStats> streamStatsList = getStreamStats(groupMetadata, topicMetadata, queueId);
queueStatsList.add(QueueStats.newBuilder().setQueueId(queueId).addAllStreamStats(streamStatsList).build());
} else {
int queueCount = topicMetadata.getCount();
for (int i = 0; i < queueCount; i++) {
List<StreamStats> streamStatsList = getStreamStats(groupMetadata, topicMetadata, i);
if (!streamStatsList.isEmpty()) {
queueStatsList.add(QueueStats.newBuilder().setQueueId(queueId).addAllStreamStats(streamStatsList).build());
}
}
}
return Pair.of(topicId, queueStatsList);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.rocketmq.proxy.mock;

import apache.rocketmq.controller.v1.StreamRole;
import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.store.api.LogicQueue;
Expand Down Expand Up @@ -159,19 +160,24 @@ public CompletableFuture<Integer> getInflightStats(long consumerGroupId, long to
}

@Override
public CompletableFuture<LogicQueue.QueueOffsetRange> getOffsetRange(long topicId, int queueId) {
public List<LogicQueue.StreamOffsetRange> getOffsetRange(long topicId, int queueId, long consumerGroupId) {
long startOffset = 0;
List<FlatMessageExt> messageList = messageMap.computeIfAbsent(topicId + queueId, v -> new ArrayList<>());
if (!messageList.isEmpty()) {
startOffset = messageList.get(0).offset();
}
long endOffset = offsetMap.computeIfAbsent(topicId + queueId, v -> new AtomicLong()).get();
return CompletableFuture.completedFuture(new LogicQueue.QueueOffsetRange(startOffset, endOffset));
return List.of(new LogicQueue.StreamOffsetRange(0, StreamRole.STREAM_ROLE_DATA, startOffset, endOffset));
}

@Override
public CompletableFuture<Long> getConsumeOffset(long consumerGroupId, long topicId, int queueId) {
return CompletableFuture.completedFuture(consumerOffsetMap.getOrDefault(Pair.of(topicId, queueId), 0L));
public long getConsumeOffset(long consumerGroupId, long topicId, int queueId) {
return consumerOffsetMap.getOrDefault(Pair.of(topicId, queueId), 0L);
}

@Override
public long getRetryConsumeOffset(long consumerGroupId, long topicId, int queueId) {
return 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ void popMessage() {
assertEquals(PopStatus.FOUND, result.getPopStatus());
assertEquals(2, result.getMsgFoundList().size());
// All messages in queue 0 has been consumed
assertEquals(2, messageStore.getConsumeOffset(consumerGroupId, topicId, 0).join());
assertEquals(2, messageStore.getConsumeOffset(consumerGroupId, topicId, 0));

// Pop again.
result = messageService.popMessage(ProxyContextExt.create(), messageQueue, header, 0L).join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
Expand Down Expand Up @@ -255,15 +257,36 @@ public CompletableFuture<Integer> getInflightStats(long consumerGroupId, long to
}

@Override
public CompletableFuture<LogicQueue.QueueOffsetRange> getOffsetRange(long topicId, int queueId) {
return logicQueueManager.getOrCreate(StoreContext.EMPTY, topicId, queueId)
.thenCompose(LogicQueue::getOffsetRange);
public List<LogicQueue.StreamOffsetRange> getOffsetRange(long topicId, int queueId, long consumerGroupId) {
CompletableFuture<Optional<LogicQueue>> future = logicQueueManager.get(topicId, queueId);
if (future.isDone()) {
return future.join()
.map(topicQueue -> topicQueue.getOffsetRange(consumerGroupId))
.orElse(List.of());
}
return List.of();
}

@Override
public CompletableFuture<Long> getConsumeOffset(long consumerGroupId, long topicId, int queueId) {
return logicQueueManager.getOrCreate(StoreContext.EMPTY, topicId, queueId)
.thenApply(topicQueue -> topicQueue.getConsumeOffset(consumerGroupId));
public long getConsumeOffset(long consumerGroupId, long topicId, int queueId) {
CompletableFuture<Optional<LogicQueue>> future = logicQueueManager.get(topicId, queueId);
if (future.isDone()) {
return future.join()
.map(topicQueue -> topicQueue.getConsumeOffset(consumerGroupId))
.orElse(0L);
}
return 0;
}

@Override
public long getRetryConsumeOffset(long consumerGroupId, long topicId, int queueId) {
CompletableFuture<Optional<LogicQueue>> future = logicQueueManager.get(topicId, queueId);
if (future.isDone()) {
return future.join()
.map(topicQueue -> topicQueue.getRetryConsumeOffset(consumerGroupId))
.orElse(0L);
}
return 0;
}

@Override
Expand Down
Loading

0 comments on commit 20b67d9

Please sign in to comment.