From 0d76801d95a444219401ea13dc841bab02d340dc Mon Sep 17 00:00:00 2001 From: SSpirits Date: Tue, 19 Dec 2023 17:44:32 +0800 Subject: [PATCH] feat(proxy): add metrics and trace for remoting send and pull RPC Signed-off-by: SSpirits --- .../rocketmq/proxy/model/ProxyContextExt.java | 8 ++++ .../activity/ExtendPullMessageActivity.java | 26 ++++++++++++- .../activity/ExtendSendMessageActivity.java | 37 ++++++++++++++++++- .../proxy/service/MessageServiceImpl.java | 19 ++++++++-- .../rocketmq/proxy/mock/MockMessageStore.java | 3 +- .../rocketmq/store/MessageStoreImpl.java | 10 +++-- .../automq/rocketmq/store/api/LogicQueue.java | 6 ++- .../rocketmq/store/api/MessageStore.java | 3 +- .../store/queue/StreamLogicQueue.java | 17 ++++++--- .../rocketmq/store/service/ReviveService.java | 4 +- .../store/service/ReviveServiceTest.java | 4 +- 11 files changed, 113 insertions(+), 24 deletions(-) diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java b/proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java index 36acbaa0e..ed1b1d727 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java @@ -35,6 +35,7 @@ public class ProxyContextExt extends ProxyContext implements TraceContext { private boolean relayed; private final Tracer tracer; + private Span rootSpan; private final BlockingDeque spanStack = new LinkedBlockingDeque<>(); @@ -78,6 +79,10 @@ public Long getRemainingMs() { return super.getRemainingMs() == null ? DEFAULT_TIMEOUT_MILLIS : super.getRemainingMs(); } + public Span rootSpan() { + return rootSpan; + } + public BlockingDeque spanStack() { return spanStack; } @@ -94,6 +99,9 @@ public Optional span() { @Override public void attachSpan(Span span) { + if (spanStack.isEmpty()) { + rootSpan = span; + } spanStack.push(span); } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java index 7baf37e2e..2ac8a867e 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java @@ -17,10 +17,15 @@ package com.automq.rocketmq.proxy.remoting.activity; +import com.automq.rocketmq.common.trace.TraceHelper; import com.automq.rocketmq.proxy.exception.ExceptionHandler; +import com.automq.rocketmq.proxy.model.ProxyContextExt; import com.automq.rocketmq.proxy.remoting.RemotingUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.FileRegion; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; @@ -35,10 +40,12 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.proxy.common.ContextVariable; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.ForbiddenType; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -95,11 +102,26 @@ protected ProxyContext createContext(ChannelHandlerContext ctx, RemotingCommand protected void writeResponse(ChannelHandlerContext ctx, ProxyContext context, RemotingCommand request, RemotingCommand response, Throwable t) { recordRpcLatency(context, response); + ProxyContextExt contextExt = (ProxyContextExt) context; + Span rootSpan = contextExt.rootSpan(); + rootSpan.setAttribute("code", RemotingHelper.getResponseCodeDesc(response.getCode())); + TraceHelper.endSpan(contextExt, rootSpan, t); super.writeResponse(ctx, context, request, response, t); } private RemotingCommand pullMessage(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws RemotingCommandException { + ProxyContextExt contextExt = (ProxyContextExt) context; + Tracer tracer = contextExt.tracer().get(); + Span rootSpan = tracer.spanBuilder("PullMessage") + .setNoParent() + .setSpanKind(SpanKind.SERVER) + .setAttribute(ContextVariable.PROTOCOL_TYPE, contextExt.getProtocolType()) + .setAttribute(ContextVariable.ACTION, contextExt.getAction()) + .setAttribute(ContextVariable.CLIENT_ID, contextExt.getClientID()) + .startSpan(); + contextExt.attachSpan(rootSpan); + // Retrieve the request header. final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); @@ -234,13 +256,15 @@ private RemotingCommand pullMessage(ChannelHandlerContext ctx, RemotingCommand r ctx.writeAndFlush(fileRegion) .addListener(future -> { recordRpcLatency(context, response); + rootSpan.setAttribute("code", RemotingHelper.getResponseCodeDesc(response.getCode())); + TraceHelper.endSpan(contextExt, rootSpan, future.cause()); if (!future.isSuccess()) { LOGGER.error("Write pull message response failed", future.cause()); } }); return; } - writeResponse(ctx, context, request, response); + writeResponse(ctx, context, request, response, null); }); return null; diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java index 10b8e0ef9..0255600fb 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java @@ -17,10 +17,15 @@ package com.automq.rocketmq.proxy.remoting.activity; +import com.automq.rocketmq.common.trace.TraceHelper; import com.automq.rocketmq.proxy.exception.ExceptionHandler; +import com.automq.rocketmq.proxy.model.ProxyContextExt; import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor; import com.automq.rocketmq.proxy.remoting.RemotingUtil; import io.netty.channel.ChannelHandlerContext; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; import java.net.InetSocketAddress; import java.util.Collections; import java.util.Map; @@ -32,12 +37,14 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.proxy.common.ContextVariable; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.processor.QueueSelector; import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.MessageQueueView; +import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; @@ -77,6 +84,17 @@ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCom @Override protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { + ProxyContextExt contextExt = (ProxyContextExt) context; + Tracer tracer = contextExt.tracer().get(); + Span rootSpan = tracer.spanBuilder("SendMessage") + .setNoParent() + .setSpanKind(SpanKind.SERVER) + .setAttribute(ContextVariable.PROTOCOL_TYPE, contextExt.getProtocolType()) + .setAttribute(ContextVariable.ACTION, contextExt.getAction()) + .setAttribute(ContextVariable.CLIENT_ID, contextExt.getClientID()) + .startSpan(); + contextExt.attachSpan(rootSpan); + // The parent class already checked the message type, added the transaction subscription. RemotingCommand superResponse = super.sendMessage(ctx, request, context); if (superResponse != null) { @@ -136,7 +154,7 @@ protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand // TODO: Support batch message in the future. SendResult sendResult = sendResults.get(0); fillSendMessageResponse(response, sendResult); - writeResponse(ctx, context, request, response); + writeResponse(ctx, context, request, response, null); }); // Return null to uplevel, the response will be sent back in the future. @@ -146,6 +164,17 @@ protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand @Override protected RemotingCommand consumerSendMessage(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { + ProxyContextExt contextExt = (ProxyContextExt) context; + Tracer tracer = contextExt.tracer().get(); + Span rootSpan = tracer.spanBuilder("ConsumerSendMessage") + .setNoParent() + .setSpanKind(SpanKind.SERVER) + .setAttribute(ContextVariable.PROTOCOL_TYPE, contextExt.getProtocolType()) + .setAttribute(ContextVariable.ACTION, contextExt.getAction()) + .setAttribute(ContextVariable.CLIENT_ID, contextExt.getClientID()) + .startSpan(); + contextExt.attachSpan(rootSpan); + ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); messagingProcessor.getServiceManager() .getMessageService() @@ -155,7 +184,7 @@ protected RemotingCommand consumerSendMessage(ChannelHandlerContext ctx, Remotin writeErrResponse(ctx, context, request, error); return; } - writeResponse(ctx, context, request, finalResponse); + writeResponse(ctx, context, request, finalResponse, null); }); return null; @@ -178,6 +207,10 @@ protected ProxyContext createContext(ChannelHandlerContext ctx, RemotingCommand protected void writeResponse(ChannelHandlerContext ctx, ProxyContext context, RemotingCommand request, RemotingCommand response, Throwable t) { recordRpcLatency(context, response); + ProxyContextExt contextExt = (ProxyContextExt) context; + Span rootSpan = contextExt.rootSpan(); + rootSpan.setAttribute("code", RemotingHelper.getResponseCodeDesc(response.getCode())); + TraceHelper.endSpan(contextExt, rootSpan, t); super.writeResponse(ctx, context, request, response, t); } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java index c70617760..4daa09437 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java @@ -323,7 +323,7 @@ public CompletableFuture sendMessageBack(ProxyContext ctx, Rece Topic topic = pair.getLeft(); ConsumerGroup group = pair.getRight(); - return store.pull(group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), + return store.pull(StoreContext.EMPTY, group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), Filter.DEFAULT_FILTER, requestHeader.getOffset(), 1, false) .thenApply(pullResult -> { if (pullResult.status() == com.automq.rocketmq.store.model.message.PullResult.Status.FOUND) { @@ -809,17 +809,21 @@ public CompletableFuture pullMessage(ProxyContext ctx, AddressableMe metadataService.updateConsumerOffset(group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), requestHeader.getCommitOffset()); } - return store.pull(group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), filter, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), false); + StoreContext storeContext = ContextUtil.buildStoreContext(ctx, topic.getName(), group.getName()); + return store.pull(storeContext, group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), filter, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), false); }) .thenCompose(result -> { + Topic topic = topicReference.get(); + ConsumerGroup group = consumerGroupReference.get(); if (result.messageList().isEmpty()) { if (result.maxOffset() - result.nextBeginOffset() > 0) { // This means there are messages in the queue but not match the filter. So we should prevent long polling. return CompletableFuture.completedFuture(new PullResult(PullStatus.NO_MATCHED_MSG, result.nextBeginOffset(), result.minOffset(), result.maxOffset(), Collections.emptyList())); } else { + StoreContext storeContext = ContextUtil.buildStoreContext(ctx, topic.getName(), group.getName()); return suspendRequestService.suspendRequest((ProxyContextExt) ctx, requestHeader.getTopic(), virtualQueue.physicalQueueId(), filter, timeoutMillis, // Function to pull message later. - timeout -> store.pull(consumerGroupReference.get().getGroupId(), topicReference.get().getTopicId(), virtualQueue.physicalQueueId(), filter, + timeout -> store.pull(storeContext, group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), filter, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), false) .thenApply(PullResultWrapper::new)) .thenApply(resultWrapper -> { @@ -827,11 +831,14 @@ public CompletableFuture pullMessage(ProxyContext ctx, AddressableMe return new PullResult(PullStatus.NO_MATCHED_MSG, result.nextBeginOffset(), result.minOffset(), result.maxOffset(), Collections.emptyList()); } com.automq.rocketmq.store.model.message.PullResult suspendResult = resultWrapper.get().inner(); + recordMetricsForPulling(topic.getName(), group.getName(), suspendResult); return new PullResult(PullStatus.FOUND, suspendResult.nextBeginOffset(), suspendResult.minOffset(), suspendResult.maxOffset(), FlatMessageUtil.convertTo(null, suspendResult.messageList(), requestHeader.getTopic(), 0, config.hostName(), config.remotingListenPort())); }); } } + + recordMetricsForPulling(topic.getName(), group.getName(), result); return CompletableFuture.completedFuture( new PullResult(PullStatus.FOUND, result.nextBeginOffset(), result.minOffset(), result.maxOffset(), FlatMessageUtil.convertTo(null, result.messageList(), requestHeader.getTopic(), 0, config.hostName(), config.remotingListenPort())) @@ -839,6 +846,12 @@ public CompletableFuture pullMessage(ProxyContext ctx, AddressableMe }); } + private void recordMetricsForPulling(String topic, String group, + com.automq.rocketmq.store.model.message.PullResult result) { + Integer messageBytesTotal = result.messageList().stream().map(message -> message.message().payloadAsByteBuffer().remaining()).reduce(0, Integer::sum); + ProxyMetricsManager.recordOutgoingMessages(topic, group, result.messageList().size(), messageBytesTotal, topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)); + } + @Override public CompletableFuture> lockBatchMQ(ProxyContext ctx, AddressableMessageQueue messageQueue, LockBatchRequestBody requestBody, long timeoutMillis) { diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java b/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java index cb228ee56..451343db7 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java @@ -97,7 +97,8 @@ public CompletableFuture pop(StoreContext context, long consumerGroup } @Override - public CompletableFuture pull(long consumerGroupId, long topicId, int queueId, Filter filter, + public CompletableFuture pull(StoreContext context, long consumerGroupId, long topicId, int queueId, + Filter filter, long offset, int batchSize, boolean retry) { if (retry) { return CompletableFuture.completedFuture(new PullResult(PullResult.Status.NO_NEW_MSG, 0L, 0L, 0L, new ArrayList<>())); diff --git a/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java b/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java index 86a009da2..edddca6dd 100644 --- a/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java +++ b/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java @@ -174,14 +174,16 @@ public CompletableFuture pop(StoreContext context, @SpanAttribute lon } @Override - public CompletableFuture pull(long consumerGroupId, long topicId, int queueId, Filter filter, + @WithSpan(kind = SpanKind.SERVER) + public CompletableFuture pull(StoreContext context, long consumerGroupId, long topicId, int queueId, + Filter filter, long offset, int batchSize, boolean retry) { - return logicQueueManager.getOrCreate(StoreContext.EMPTY, topicId, queueId) + return logicQueueManager.getOrCreate(context, topicId, queueId) .thenCompose(topicQueue -> { if (retry) { - return topicQueue.pullRetry(consumerGroupId, filter, offset, batchSize); + return topicQueue.pullRetry(context, consumerGroupId, filter, offset, batchSize); } - return topicQueue.pullNormal(consumerGroupId, filter, offset, batchSize); + return topicQueue.pullNormal(context, consumerGroupId, filter, offset, batchSize); }); } diff --git a/store/src/main/java/com/automq/rocketmq/store/api/LogicQueue.java b/store/src/main/java/com/automq/rocketmq/store/api/LogicQueue.java index fbf66b6b2..42c64ba28 100644 --- a/store/src/main/java/com/automq/rocketmq/store/api/LogicQueue.java +++ b/store/src/main/java/com/automq/rocketmq/store/api/LogicQueue.java @@ -78,10 +78,12 @@ public abstract CompletableFuture changeInvisible public abstract int getInflightStats(long consumerGroupId); - public abstract CompletableFuture pullNormal(long consumerGroupId, Filter filter, long startOffset, + public abstract CompletableFuture pullNormal(StoreContext context, long consumerGroupId, Filter filter, + long startOffset, int batchSize); - public abstract CompletableFuture pullRetry(long consumerGroupId, Filter filter, long startOffset, + public abstract CompletableFuture pullRetry(StoreContext context, long consumerGroupId, Filter filter, + long startOffset, int batchSize); public abstract long getConsumeOffset(long consumerGroupId); diff --git a/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java b/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java index 9fe5d1787..6736e66bb 100644 --- a/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java +++ b/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java @@ -66,7 +66,8 @@ CompletableFuture pop(StoreContext context, long consumerGroupId, lon * @param retry whether to pull retry messages * @return pull result, see {@link PullResult} */ - CompletableFuture pull(long consumerGroupId, long topicId, int queueId, Filter filter, long offset, + CompletableFuture pull(StoreContext context, long consumerGroupId, long topicId, int queueId, + Filter filter, long offset, int batchSize, boolean retry); /** diff --git a/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java b/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java index 58d383c9c..171ec5968 100644 --- a/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java +++ b/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java @@ -612,15 +612,18 @@ public int getInflightStats(long consumerGroupId) { } @Override - public CompletableFuture pullNormal(long consumerGroupId, Filter filter, long startOffset, + @WithSpan(kind = SpanKind.SERVER) + public CompletableFuture pullNormal(StoreContext context, long consumerGroupId, Filter filter, + long startOffset, int batchSize) { if (state.get() != State.OPENED) { return CompletableFuture.failedFuture(new StoreException(StoreErrorCode.QUEUE_NOT_OPENED, "Topic queue not opened")); } - return pull(dataStreamId, consumerGroupId, filter, startOffset, batchSize); + return pull(context, dataStreamId, consumerGroupId, filter, startOffset, batchSize); } - private CompletableFuture pull(long streamId, long consumerGroupId, Filter filter, long startOffset, + private CompletableFuture pull(StoreContext context, long streamId, long consumerGroupId, Filter filter, + long startOffset, int batchSize) { int fetchBatchSize; if (filter.needApply()) { @@ -632,7 +635,7 @@ private CompletableFuture pull(long streamId, long consumerGroupId, } FilterFetchResult fetchResult = new FilterFetchResult(startOffset); long operationTimestamp = System.currentTimeMillis(); - CompletableFuture fetchCf = fetchAndFilterMessages(StoreContext.EMPTY, streamId, startOffset, batchSize, + CompletableFuture fetchCf = fetchAndFilterMessages(context, streamId, startOffset, batchSize, fetchBatchSize, filter, fetchResult, 0, 0, operationTimestamp); return fetchCf.thenApply(filterFetchResult -> { List messageExtList = filterFetchResult.messageList; @@ -648,13 +651,15 @@ private CompletableFuture pull(long streamId, long consumerGroupId, } @Override - public CompletableFuture pullRetry(long consumerGroupId, Filter filter, long startOffset, + @WithSpan(kind = SpanKind.SERVER) + public CompletableFuture pullRetry(StoreContext context, long consumerGroupId, Filter filter, + long startOffset, int batchSize) { if (state.get() != State.OPENED) { return CompletableFuture.failedFuture(new StoreException(StoreErrorCode.QUEUE_NOT_OPENED, "Topic queue not opened")); } CompletableFuture retryStreamIdCf = retryStreamId(consumerGroupId); - return retryStreamIdCf.thenCompose(streamId -> pull(streamId, consumerGroupId, filter, startOffset, batchSize)); + return retryStreamIdCf.thenCompose(streamId -> pull(context, streamId, consumerGroupId, filter, startOffset, batchSize)); } @Override diff --git a/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java b/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java index da8f78730..2489f35b2 100644 --- a/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java +++ b/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java @@ -152,9 +152,9 @@ public void run() { CompletableFuture pullFuture; if (operationType == PopOperation.PopOperationType.POP_RETRY) { - pullFuture = queue.pullRetry(consumerGroupId, Filter.DEFAULT_FILTER, checkPoint.messageOffset(), 1); + pullFuture = queue.pullRetry(context, consumerGroupId, Filter.DEFAULT_FILTER, checkPoint.messageOffset(), 1); } else { - pullFuture = queue.pullNormal(consumerGroupId, Filter.DEFAULT_FILTER, checkPoint.messageOffset(), 1); + pullFuture = queue.pullNormal(context, consumerGroupId, Filter.DEFAULT_FILTER, checkPoint.messageOffset(), 1); } return pullFuture.thenApply(result -> { if (result.messageList().isEmpty()) { diff --git a/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java b/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java index 38ea06fe9..91b56885b 100644 --- a/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java @@ -136,7 +136,7 @@ void revive_normal() throws StoreException { assertNull(ckValue); // check if this message has been appended to retry stream - PullResult retryPullResult = logicQueue.pullRetry(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 0, invisibleDuration).join(); + PullResult retryPullResult = logicQueue.pullRetry(StoreContext.EMPTY, CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 0, invisibleDuration).join(); assertEquals(1, retryPullResult.messageList().size()); // pop retry @@ -264,7 +264,7 @@ void revive_dead_letter() throws Exception { assertNull(ckValue); // check if this message has been appended to retry stream - PullResult retryPullResult = logicQueue.pullRetry(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 0, 32).join(); + PullResult retryPullResult = logicQueue.pullRetry(StoreContext.EMPTY, CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 0, 32).join(); assertEquals(1, retryPullResult.messageList().size()); // pop retry