diff --git a/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java b/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java index 82ab006df..da6344a80 100644 --- a/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java +++ b/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java @@ -385,8 +385,14 @@ public CompletableFuture popRetry(StoreContext context, long consumer return retryStreamIdFuture.thenCompose(retryStreamId -> pop(context, consumerGroupId, retryStreamId, offset, PopOperation.PopOperationType.POP_RETRY, filter, batchSize, invisibleDuration)); } + record FetchResult(List messageList, long endOffset) { + public int size() { + return messageList.size(); + } + } + @WithSpan(kind = SpanKind.SERVER) - private CompletableFuture> fetchMessages(StoreContext context, @SpanAttribute long streamId, + private CompletableFuture fetchMessages(StoreContext context, @SpanAttribute long streamId, @SpanAttribute long offset, @SpanAttribute int batchSize) { long startOffset = streamStore.startOffset(streamId); if (offset < startOffset) { @@ -395,13 +401,14 @@ private CompletableFuture> fetchMessages(StoreContext conte long confirmOffset = streamStore.confirmOffset(streamId); if (offset >= confirmOffset) { - return CompletableFuture.completedFuture(Collections.emptyList()); + return CompletableFuture.completedFuture(new FetchResult(Collections.emptyList(), confirmOffset)); } if (offset + batchSize > confirmOffset) { batchSize = (int) (confirmOffset - offset); } + long finalOffset = offset; return streamStore.fetch(context, streamId, offset, batchSize) .thenApply(fetchResult -> { AtomicLong fetchBytes = new AtomicLong(); @@ -422,7 +429,7 @@ private CompletableFuture> fetchMessages(StoreContext conte context.span().ifPresent(span -> span.setAttribute("fetchBytes", fetchBytes.get())); - return resultList; + return new FetchResult(resultList, finalOffset + resultList.size()); }); } @@ -436,7 +443,7 @@ private CompletableFuture fetchAndFilterMessages(StoreContext return fetchMessages(context, streamId, offset, fetchBatchSize) .thenCompose(fetchResult -> { // Add filter result to message list. - List matchedMessageList = filter.doFilter(fetchResult); + List matchedMessageList = filter.doFilter(fetchResult.messageList()); // Update end offset int index = batchSize - result.size(); if (matchedMessageList.size() > index) { @@ -444,7 +451,7 @@ private CompletableFuture fetchAndFilterMessages(StoreContext result.setEndOffset(messageExt.offset()); result.addMessageList(matchedMessageList.subList(0, index)); } else { - result.setEndOffset(offset + fetchResult.size()); + result.setEndOffset(fetchResult.endOffset()); result.addMessageList(matchedMessageList); } // If not enough messages after applying filter, fetch more messages. @@ -452,7 +459,7 @@ private CompletableFuture fetchAndFilterMessages(StoreContext boolean hasMoreMessages = fetchResult.size() >= fetchBatchSize; int newFetchCount = fetchCount + fetchResult.size(); - long newFetchBytes = fetchBytes + fetchResult.stream() + long newFetchBytes = fetchBytes + fetchResult.messageList().stream() .map(messageExt -> (long) messageExt.message().getByteBuffer().limit()) .reduce(0L, Long::sum); boolean notExceedLimit = newFetchCount < config.maxFetchCount() &&