From 9ad5975f51020918325c70cdaf228b43c8ac67fb Mon Sep 17 00:00:00 2001 From: "kaiyi.lk" Date: Mon, 24 Apr 2023 15:45:26 +0800 Subject: [PATCH] [ISSUE #6756] Implement pull APIs for proxy --- .../acl/plain/PlainAccessResource.java | 3 +- pom.xml | 2 +- .../proxy/common/utils/ProxyUtils.java | 1 + .../grpc/v2/DefaultGrpcMessingActivity.java | 32 ++++ .../grpc/v2/GrpcMessagingApplication.java | 79 ++++++++ .../proxy/grpc/v2/GrpcMessingActivity.java | 17 ++ .../proxy/grpc/v2/client/ClientActivity.java | 6 +- .../v2/common/GrpcClientSettingsManager.java | 3 +- .../proxy/grpc/v2/common/GrpcConverter.java | 13 ++ .../proxy/grpc/v2/common/GrpcValidator.java | 33 ++++ .../proxy/grpc/v2/common/ResponseBuilder.java | 4 + .../grpc/v2/consumer/PullMessageActivity.java | 174 ++++++++++++++++++ .../PullMessageResponseStreamWriter.java | 103 +++++++++++ .../v2/consumer/ReceiveMessageActivity.java | 38 +--- .../proxy/processor/ConsumerProcessor.java | 24 ++- .../processor/DefaultMessagingProcessor.java | 10 +- .../proxy/processor/MessagingProcessor.java | 48 +++++ .../message/ClusterMessageService.java | 16 +- .../service/message/LocalMessageService.java | 9 +- .../proxy/service/message/MessageService.java | 9 + .../service/mqclient/MQClientAPIExt.java | 10 +- .../apache/rocketmq/test/base/BaseConf.java | 4 +- .../rocketmq/test/grpc/v2/ClusterGrpcIT.java | 5 + .../rocketmq/test/grpc/v2/GrpcBaseIT.java | 123 +++++++++++++ 24 files changed, 714 insertions(+), 52 deletions(-) create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/PullMessageActivity.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/PullMessageResponseStreamWriter.java diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java index cdbd9ea9b36d..49fb8781bf3d 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java @@ -219,7 +219,8 @@ public static PlainAccessResource parse(GeneratedMessageV3 messageV3, Authentica if (HeartbeatRequest.getDescriptor().getFullName().equals(rpcFullName)) { HeartbeatRequest request = (HeartbeatRequest) messageV3; if (ClientType.PUSH_CONSUMER.equals(request.getClientType()) - || ClientType.SIMPLE_CONSUMER.equals(request.getClientType())) { + || ClientType.SIMPLE_CONSUMER.equals(request.getClientType()) + || ClientType.PULL_CONSUMER.equals(request.getClientType())) { if (!request.hasGroup()) { throw new AclException("Consumer heartbeat doesn't have group"); } else { diff --git a/pom.xml b/pom.xml index c009d0c52d51..b0f6fc7e5719 100644 --- a/pom.xml +++ b/pom.xml @@ -125,7 +125,7 @@ 6.0.53 1.0-beta-4 1.4.2 - 2.0.2 + 2.0.3-SNAPSHOT 1.50.0 3.20.1 1.2.10 diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/ProxyUtils.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/ProxyUtils.java index 7e82a49613b2..d87498f9abf6 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/ProxyUtils.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/ProxyUtils.java @@ -19,6 +19,7 @@ public class ProxyUtils { public static final int MAX_MSG_NUMS_FOR_POP_REQUEST = 32; + public static final int MAX_MSG_NUMS_FOR_PULL_REQUEST = 32; public static final String BROKER_ADDR = "brokerAddr"; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java index 194b9204f4b9..9a6dc10e7202 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java @@ -24,12 +24,18 @@ import apache.rocketmq.v2.EndTransactionResponse; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse; +import apache.rocketmq.v2.GetOffsetRequest; +import apache.rocketmq.v2.GetOffsetResponse; import apache.rocketmq.v2.HeartbeatRequest; import apache.rocketmq.v2.HeartbeatResponse; import apache.rocketmq.v2.NotifyClientTerminationRequest; import apache.rocketmq.v2.NotifyClientTerminationResponse; +import apache.rocketmq.v2.PullMessageRequest; +import apache.rocketmq.v2.PullMessageResponse; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryAssignmentResponse; +import apache.rocketmq.v2.QueryOffsetRequest; +import apache.rocketmq.v2.QueryOffsetResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; import apache.rocketmq.v2.ReceiveMessageRequest; @@ -37,6 +43,8 @@ import apache.rocketmq.v2.SendMessageRequest; import apache.rocketmq.v2.SendMessageResponse; import apache.rocketmq.v2.TelemetryCommand; +import apache.rocketmq.v2.UpdateOffsetRequest; +import apache.rocketmq.v2.UpdateOffsetResponse; import io.grpc.stub.StreamObserver; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.constant.LoggerName; @@ -49,6 +57,7 @@ import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.grpc.v2.consumer.AckMessageActivity; import org.apache.rocketmq.proxy.grpc.v2.consumer.ChangeInvisibleDurationActivity; +import org.apache.rocketmq.proxy.grpc.v2.consumer.PullMessageActivity; import org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity; import org.apache.rocketmq.proxy.grpc.v2.producer.ForwardMessageToDLQActivity; import org.apache.rocketmq.proxy.grpc.v2.producer.SendMessageActivity; @@ -64,6 +73,7 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme protected GrpcChannelManager grpcChannelManager; protected ReceiptHandleProcessor receiptHandleProcessor; protected ReceiveMessageActivity receiveMessageActivity; + protected PullMessageActivity pullMessageActivity; protected AckMessageActivity ackMessageActivity; protected ChangeInvisibleDurationActivity changeInvisibleDurationActivity; protected SendMessageActivity sendMessageActivity; @@ -82,6 +92,7 @@ protected void init(MessagingProcessor messagingProcessor) { this.receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor); this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); + this.pullMessageActivity = new PullMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.ackMessageActivity = new AckMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); this.sendMessageActivity = new SendMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); @@ -149,6 +160,27 @@ public CompletableFuture changeInvisibleDuratio return this.changeInvisibleDurationActivity.changeInvisibleDuration(ctx, request); } + @Override + public void pullMessage(ProxyContext ctx, PullMessageRequest request, + StreamObserver responseObserver) { + this.pullMessageActivity.pullMessage(ctx, request, responseObserver); + } + + @Override + public CompletableFuture updateOffset(ProxyContext ctx, UpdateOffsetRequest request) { + return this.pullMessageActivity.updateOffset(ctx, request); + } + + @Override + public CompletableFuture getOffset(ProxyContext ctx, GetOffsetRequest request) { + return this.pullMessageActivity.getOffset(ctx, request); + } + + @Override + public CompletableFuture queryOffset(ProxyContext ctx, QueryOffsetRequest request) { + return this.pullMessageActivity.queryOffset(ctx, request); + } + @Override public StreamObserver telemetry(ProxyContext ctx, StreamObserver responseObserver) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java index f283b25ff813..5b4b33375bd5 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java @@ -26,13 +26,19 @@ import apache.rocketmq.v2.EndTransactionResponse; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse; +import apache.rocketmq.v2.GetOffsetRequest; +import apache.rocketmq.v2.GetOffsetResponse; import apache.rocketmq.v2.HeartbeatRequest; import apache.rocketmq.v2.HeartbeatResponse; import apache.rocketmq.v2.MessagingServiceGrpc; import apache.rocketmq.v2.NotifyClientTerminationRequest; import apache.rocketmq.v2.NotifyClientTerminationResponse; +import apache.rocketmq.v2.PullMessageRequest; +import apache.rocketmq.v2.PullMessageResponse; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryAssignmentResponse; +import apache.rocketmq.v2.QueryOffsetRequest; +import apache.rocketmq.v2.QueryOffsetResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; import apache.rocketmq.v2.ReceiveMessageRequest; @@ -41,6 +47,8 @@ import apache.rocketmq.v2.SendMessageResponse; import apache.rocketmq.v2.Status; import apache.rocketmq.v2.TelemetryCommand; +import apache.rocketmq.v2.UpdateOffsetRequest; +import apache.rocketmq.v2.UpdateOffsetResponse; import io.grpc.Context; import io.grpc.Metadata; import io.grpc.stub.StreamObserver; @@ -373,6 +381,77 @@ public void changeInvisibleDuration(ChangeInvisibleDurationRequest request, } } + @Override + public void pullMessage(PullMessageRequest request, StreamObserver responseObserver) { + Function statusResponseCreator = status -> PullMessageResponse.newBuilder().setStatus(status).build(); + ProxyContext context = createContext(); + try { + validateContext(context); + this.addExecutor(this.consumerThreadPoolExecutor, + context, + request, + () -> grpcMessingActivity.pullMessage(context, request, responseObserver), + responseObserver, + statusResponseCreator); + } catch (Throwable t) { + writeResponse(context, request, null, responseObserver, t, statusResponseCreator); + } + } + + @Override + public void updateOffset(UpdateOffsetRequest request, StreamObserver responseObserver) { + Function statusResponseCreator = status -> UpdateOffsetResponse.newBuilder().setStatus(status).build(); + ProxyContext context = createContext(); + try { + validateContext(context); + this.addExecutor(this.consumerThreadPoolExecutor, + context, + request, + () -> grpcMessingActivity.updateOffset(context, request) + .whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)), + responseObserver, + statusResponseCreator); + } catch (Throwable t) { + writeResponse(context, request, null, responseObserver, t, statusResponseCreator); + } + } + + @Override + public void getOffset(GetOffsetRequest request, StreamObserver responseObserver) { + Function statusResponseCreator = status -> GetOffsetResponse.newBuilder().setStatus(status).build(); + ProxyContext context = createContext(); + try { + validateContext(context); + this.addExecutor(this.consumerThreadPoolExecutor, + context, + request, + () -> grpcMessingActivity.getOffset(context, request) + .whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)), + responseObserver, + statusResponseCreator); + } catch (Throwable t) { + writeResponse(context, request, null, responseObserver, t, statusResponseCreator); + } + } + + @Override + public void queryOffset(QueryOffsetRequest request, StreamObserver responseObserver) { + Function statusResponseCreator = status -> QueryOffsetResponse.newBuilder().setStatus(status).build(); + ProxyContext context = createContext(); + try { + validateContext(context); + this.addExecutor(this.consumerThreadPoolExecutor, + context, + request, + () -> grpcMessingActivity.queryOffset(context, request) + .whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)), + responseObserver, + statusResponseCreator); + } catch (Throwable t) { + writeResponse(context, request, null, responseObserver, t, statusResponseCreator); + } + } + @Override public StreamObserver telemetry(StreamObserver responseObserver) { Function statusResponseCreator = status -> TelemetryCommand.newBuilder().setStatus(status).build(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java index 0f353e94db6e..bb34a6f84954 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java @@ -25,12 +25,18 @@ import apache.rocketmq.v2.EndTransactionResponse; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse; +import apache.rocketmq.v2.GetOffsetRequest; +import apache.rocketmq.v2.GetOffsetResponse; import apache.rocketmq.v2.HeartbeatRequest; import apache.rocketmq.v2.HeartbeatResponse; import apache.rocketmq.v2.NotifyClientTerminationRequest; import apache.rocketmq.v2.NotifyClientTerminationResponse; +import apache.rocketmq.v2.PullMessageRequest; +import apache.rocketmq.v2.PullMessageResponse; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryAssignmentResponse; +import apache.rocketmq.v2.QueryOffsetRequest; +import apache.rocketmq.v2.QueryOffsetResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; import apache.rocketmq.v2.ReceiveMessageRequest; @@ -38,6 +44,8 @@ import apache.rocketmq.v2.SendMessageRequest; import apache.rocketmq.v2.SendMessageResponse; import apache.rocketmq.v2.TelemetryCommand; +import apache.rocketmq.v2.UpdateOffsetRequest; +import apache.rocketmq.v2.UpdateOffsetResponse; import io.grpc.stub.StreamObserver; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.proxy.common.ProxyContext; @@ -69,5 +77,14 @@ CompletableFuture notifyClientTermination(Proxy CompletableFuture changeInvisibleDuration(ProxyContext ctx, ChangeInvisibleDurationRequest request); + void pullMessage(ProxyContext ctx, PullMessageRequest request, + StreamObserver responseObserver); + + CompletableFuture updateOffset(ProxyContext ctx, UpdateOffsetRequest request); + + CompletableFuture getOffset(ProxyContext ctx, GetOffsetRequest request); + + CompletableFuture queryOffset(ProxyContext ctx, QueryOffsetRequest request); + StreamObserver telemetry(ProxyContext ctx, StreamObserver responseObserver); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java index de8fba4a6379..d9ffa10a6892 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java @@ -105,6 +105,7 @@ public CompletableFuture heartbeat(ProxyContext ctx, Heartbea } break; } + case PULL_CONSUMER: case PUSH_CONSUMER: case SIMPLE_CONSUMER: { validateConsumerGroup(request.getGroup()); @@ -149,6 +150,7 @@ public CompletableFuture notifyClientTerminatio } } break; + case PULL_CONSUMER: case PUSH_CONSUMER: case SIMPLE_CONSUMER: validateConsumerGroup(request.getGroup()); @@ -385,9 +387,11 @@ protected ConsumeMessageDirectlyResult buildConsumeMessageDirectlyResult(Status protected ConsumeType buildConsumeType(ClientType clientType) { switch (clientType) { case SIMPLE_CONSUMER: - return ConsumeType.CONSUME_ACTIVELY; + return ConsumeType.CONSUME_POP; case PUSH_CONSUMER: return ConsumeType.CONSUME_PASSIVELY; + case PULL_CONSUMER: + return ConsumeType.CONSUME_ACTIVELY; default: throw new IllegalArgumentException("Client type is not consumer, type: " + clientType); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java index dcb6194165a2..39d5ba95cfef 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java @@ -19,7 +19,6 @@ import apache.rocketmq.v2.Address; import apache.rocketmq.v2.AddressScheme; -import apache.rocketmq.v2.ClientType; import apache.rocketmq.v2.CustomizedBackoff; import apache.rocketmq.v2.Endpoints; import apache.rocketmq.v2.ExponentialBackoff; @@ -233,7 +232,7 @@ protected void onWaitEnd() { for (String clientId : clientIdSet) { try { CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, settings) -> { - if (!settings.getClientType().equals(ClientType.PUSH_CONSUMER) && !settings.getClientType().equals(ClientType.SIMPLE_CONSUMER)) { + if (!GrpcValidator.getInstance().isConsumer(settings.getClientType())) { return settings; } String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup()); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java index 4daf83511960..6482a89d33f0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java @@ -18,10 +18,12 @@ package org.apache.rocketmq.proxy.grpc.v2.common; import apache.rocketmq.v2.Broker; +import apache.rocketmq.v2.Code; import apache.rocketmq.v2.DeadLetterQueue; import apache.rocketmq.v2.Digest; import apache.rocketmq.v2.DigestType; import apache.rocketmq.v2.Encoding; +import apache.rocketmq.v2.FilterExpression; import apache.rocketmq.v2.FilterType; import apache.rocketmq.v2.Message; import apache.rocketmq.v2.MessageQueue; @@ -46,6 +48,8 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.NamespaceUtil; +import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; public class GrpcConverter { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -86,6 +90,15 @@ public MessageQueue buildMessageQueue(MessageExt messageExt, String brokerName) .build(); } + public SubscriptionData buildSubscriptionData(String rocketmqTopic, FilterExpression filterExpression) { + try { + return FilterAPI.build(rocketmqTopic, filterExpression.getExpression(), + GrpcConverter.getInstance().buildExpressionType(filterExpression.getType())); + } catch (Exception e) { + throw new GrpcProxyException(Code.ILLEGAL_FILTER_EXPRESSION, e.getMessage()); + } + } + public String buildExpressionType(FilterType filterType) { switch (filterType) { case SQL: diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcValidator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcValidator.java index cfcd2a269380..36d47ed0971d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcValidator.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcValidator.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.proxy.grpc.v2.common; +import apache.rocketmq.v2.ClientType; import apache.rocketmq.v2.Code; import apache.rocketmq.v2.Resource; import com.google.common.base.CharMatcher; @@ -29,10 +30,12 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; public class GrpcValidator { protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + protected static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3"; protected static final Object INSTANCE_CREATE_LOCK = new Object(); protected static volatile GrpcValidator instance; @@ -47,6 +50,36 @@ public static GrpcValidator getInstance() { return instance; } + public boolean isConsumer(ClientType clientType) { + return ClientType.SIMPLE_CONSUMER.equals(clientType) || + ClientType.PUSH_CONSUMER.equals(clientType) || + ClientType.PULL_CONSUMER.equals(clientType); + } + + public long reasonableLongPollingTimeout(Long longPollingTimeout, Long timeRemaining, String clientVersion) { + long pollingTime = longPollingTimeout; + ProxyConfig config = ConfigurationManager.getProxyConfig(); + if (pollingTime < config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) { + pollingTime = config.getGrpcClientConsumerMinLongPollingTimeoutMillis(); + } + if (pollingTime > config.getGrpcClientConsumerMaxLongPollingTimeoutMillis()) { + pollingTime = config.getGrpcClientConsumerMaxLongPollingTimeoutMillis(); + } + + if (pollingTime > timeRemaining) { + if (timeRemaining >= config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) { + pollingTime = timeRemaining; + } else { + Code code = + null == clientVersion || ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION.compareTo(clientVersion) > 0 ? + Code.BAD_REQUEST : Code.ILLEGAL_POLLING_TIME; + throw new GrpcProxyException(code, "The deadline time remaining is not enough" + + " for polling, please check network condition"); + } + } + return pollingTime; + } + public void validateTopic(Resource topic) { validateTopic(GrpcConverter.getInstance().wrapResourceWithNamespace(topic)); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java index 0b3c85ea6748..64e7baf55c27 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.exception.OffsetNotFoundException; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -73,6 +74,9 @@ public Status buildStatus(Throwable t) { if (TopicRouteHelper.isTopicNotExistError(t)) { return buildStatus(Code.TOPIC_NOT_FOUND, t.getMessage()); } + if (t instanceof OffsetNotFoundException) { + return buildStatus(Code.OFFSET_NOT_FOUND, "offset not found"); + } if (t instanceof MQBrokerException) { MQBrokerException mqBrokerException = (MQBrokerException) t; return buildStatus(buildCode(mqBrokerException.getResponseCode()), mqBrokerException.getErrorMessage()); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/PullMessageActivity.java new file mode 100644 index 000000000000..a3746b84aff0 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/PullMessageActivity.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.grpc.v2.consumer; + +import apache.rocketmq.v2.Broker; +import apache.rocketmq.v2.Code; +import apache.rocketmq.v2.GetOffsetRequest; +import apache.rocketmq.v2.GetOffsetResponse; +import apache.rocketmq.v2.PullMessageRequest; +import apache.rocketmq.v2.PullMessageResponse; +import apache.rocketmq.v2.QueryOffsetPolicy; +import apache.rocketmq.v2.QueryOffsetRequest; +import apache.rocketmq.v2.QueryOffsetResponse; +import apache.rocketmq.v2.UpdateOffsetRequest; +import apache.rocketmq.v2.UpdateOffsetResponse; +import com.google.protobuf.util.Durations; +import com.google.protobuf.util.Timestamps; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity; +import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcValidator; +import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; + +public class PullMessageActivity extends AbstractMessingActivity { + + public PullMessageActivity(MessagingProcessor messagingProcessor, + GrpcClientSettingsManager grpcClientSettingsManager, + GrpcChannelManager grpcChannelManager) { + super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + } + + public void pullMessage(ProxyContext ctx, PullMessageRequest request, + StreamObserver responseObserver) { + PullMessageResponseStreamWriter writer = createWriter(ctx, responseObserver); + try { + long pollingTime = GrpcValidator.getInstance().reasonableLongPollingTimeout( + Durations.toMillis(request.getLongPollingTimeout()), + ctx.getRemainingMs(), + ctx.getClientVersion() + ); + + Broker broker = request.getMessageQueue().getBroker(); + String topic = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getMessageQueue().getTopic()); + String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); + SubscriptionData subscriptionData = GrpcConverter.getInstance().buildSubscriptionData(topic, request.getFilterExpression()); + + this.messagingProcessor.pullMessage( + ctx, + new MessageQueue(topic, broker.getName(), request.getMessageQueue().getId()), + group, + request.getOffset(), + request.getBatchSize(), + PullSysFlag.buildSysFlag(false, true, true, false), + 0, + pollingTime, + subscriptionData, + ctx.getRemainingMs() + ) + .thenAccept(pullResult -> writer.writeAndComplete(ctx, pullResult)) + .exceptionally(t -> { + writer.writeAndComplete(ctx, t); + return null; + }); + } catch (Throwable t) { + writer.writeAndComplete(ctx, t); + } + } + + protected PullMessageResponseStreamWriter createWriter(ProxyContext ctx, + StreamObserver responseObserver) { + return new PullMessageResponseStreamWriter(responseObserver); + } + + public CompletableFuture updateOffset(ProxyContext ctx, UpdateOffsetRequest request) { + CompletableFuture future = new CompletableFuture<>(); + try { + Broker broker = request.getMessageQueue().getBroker(); + String topic = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getMessageQueue().getTopic()); + String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); + + return this.messagingProcessor.updateConsumerOffset( + ctx, + new MessageQueue(topic, broker.getName(), request.getMessageQueue().getId()), + group, + request.getOffset(), + false + ).thenApply(r -> UpdateOffsetResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())) + .build()); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } + + public CompletableFuture getOffset(ProxyContext ctx, GetOffsetRequest request) { + CompletableFuture future = new CompletableFuture<>(); + try { + Broker broker = request.getMessageQueue().getBroker(); + String topic = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getMessageQueue().getTopic()); + String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); + + return this.messagingProcessor.queryConsumerOffset( + ctx, + new MessageQueue(topic, broker.getName(), request.getMessageQueue().getId()), + group + ).thenApply(offset -> GetOffsetResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())) + .setOffset(offset) + .build()); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } + + public CompletableFuture queryOffset(ProxyContext ctx, QueryOffsetRequest request) { + CompletableFuture future = new CompletableFuture<>(); + try { + Broker broker = request.getMessageQueue().getBroker(); + String topic = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getMessageQueue().getTopic()); + MessageQueue messageQueue = new MessageQueue(topic, broker.getName(), request.getMessageQueue().getId()); + + QueryOffsetPolicy queryOffsetPolicy = request.getQueryOffsetPolicy(); + CompletableFuture responseFuture; + switch (queryOffsetPolicy) { + case BEGINNING: + responseFuture = this.messagingProcessor.getMinOffset(ctx, messageQueue); + break; + case END: + responseFuture = this.messagingProcessor.getMaxOffset(ctx, messageQueue); + break; + case TIMESTAMP: + if (!request.hasTimestamp()) { + throw new GrpcProxyException(Code.BAD_REQUEST, "timestamp is required when queryOffsetPolicy is TIMESTAMP"); + } + responseFuture = this.messagingProcessor.searchOffset(ctx, messageQueue, Timestamps.toMillis(request.getTimestamp())); + break; + default: + throw new GrpcProxyException(Code.BAD_REQUEST, "unrecognized queryOffsetPolicy"); + } + return responseFuture.thenApply(offset -> QueryOffsetResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())) + .setOffset(offset).build()); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/PullMessageResponseStreamWriter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/PullMessageResponseStreamWriter.java new file mode 100644 index 000000000000..e06572edf85a --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/PullMessageResponseStreamWriter.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.grpc.v2.consumer; + +import apache.rocketmq.v2.Code; +import apache.rocketmq.v2.PullMessageResponse; +import io.grpc.stub.StreamObserver; +import java.util.List; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; +import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; + +public class PullMessageResponseStreamWriter { + private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + protected final StreamObserver streamObserver; + + public PullMessageResponseStreamWriter(StreamObserver streamObserver) { + this.streamObserver = streamObserver; + } + + public void writeAndComplete(ProxyContext ctx, PullResult pullResult) { + List messageFoundList = pullResult.getMsgFoundList(); + try { + switch (pullResult.getPullStatus()) { + case FOUND: + if (messageFoundList.isEmpty()) { + streamObserver.onNext(PullMessageResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.MESSAGE_NOT_FOUND, "no match message")) + .build()); + } else { + streamObserver.onNext(PullMessageResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())) + .build()); + for (MessageExt messageExt : messageFoundList) { + streamObserver.onNext(PullMessageResponse.newBuilder() + .setMessage(GrpcConverter.getInstance().buildMessage(messageExt)) + .build()); + } + } + break; + case OFFSET_ILLEGAL: + streamObserver.onNext(PullMessageResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.ILLEGAL_OFFSET, Code.ILLEGAL_OFFSET.name())) + .build()); + break; + case NO_NEW_MSG: + case NO_MATCHED_MSG: + default: + streamObserver.onNext(PullMessageResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.MESSAGE_NOT_FOUND, "no new message")) + .build()); + break; + } + streamObserver.onNext(PullMessageResponse.newBuilder() + .setNextOffset(pullResult.getNextBeginOffset()) + .build()); + } catch (Throwable t) { + log.warn("err when write pull message response. pullResult:{}", pullResult, t); + writeAndComplete(ctx, t); + } finally { + onCompleted(); + } + } + + public void writeAndComplete(ProxyContext ctx, Throwable throwable) { + try { + streamObserver.onNext(PullMessageResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(throwable)) + .build()); + streamObserver.onCompleted(); + } catch (Throwable t) { + log.warn("err when write throwable when pull message. throwable:{}", throwable, t); + onCompleted(); + } + } + + protected void onCompleted() { + try { + streamObserver.onCompleted(); + } catch (Throwable ignored) { + } + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index 9df4101f7326..c3f7c9f847cd 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.proxy.grpc.v2.consumer; -import apache.rocketmq.v2.Code; -import apache.rocketmq.v2.FilterExpression; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.Settings; @@ -38,18 +36,17 @@ import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcValidator; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.processor.QueueSelector; import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.MessageQueueSelector; import org.apache.rocketmq.proxy.service.route.MessageQueueView; -import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; public class ReceiveMessageActivity extends AbstractMessingActivity { protected ReceiptHandleProcessor receiptHandleProcessor; - private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3"; public ReceiveMessageActivity(MessagingProcessor messagingProcessor, ReceiptHandleProcessor receiptHandleProcessor, GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { @@ -66,7 +63,6 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, Subscription subscription = settings.getSubscription(); boolean fifo = subscription.getFifo(); int maxAttempts = settings.getBackoffPolicy().getMaxAttempts(); - ProxyConfig config = ConfigurationManager.getProxyConfig(); Long timeRemaining = ctx.getRemainingMs(); long pollingTime; @@ -75,26 +71,7 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, } else { pollingTime = timeRemaining - Durations.toMillis(settings.getRequestTimeout()) / 2; } - if (pollingTime < config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) { - pollingTime = config.getGrpcClientConsumerMinLongPollingTimeoutMillis(); - } - if (pollingTime > config.getGrpcClientConsumerMaxLongPollingTimeoutMillis()) { - pollingTime = config.getGrpcClientConsumerMaxLongPollingTimeoutMillis(); - } - - if (pollingTime > timeRemaining) { - if (timeRemaining >= config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) { - pollingTime = timeRemaining; - } else { - final String clientVersion = ctx.getClientVersion(); - Code code = - null == clientVersion || ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION.compareTo(clientVersion) > 0 ? - Code.BAD_REQUEST : Code.ILLEGAL_POLLING_TIME; - writer.writeAndComplete(ctx, code, "The deadline time remaining is not enough" + - " for polling, please check network condition"); - return; - } - } + pollingTime = GrpcValidator.getInstance().reasonableLongPollingTimeout(pollingTime, timeRemaining, ctx.getClientVersion()); validateTopicAndConsumerGroup(request.getMessageQueue().getTopic(), request.getGroup()); String topic = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getMessageQueue().getTopic()); @@ -109,16 +86,7 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, ConfigurationManager.getProxyConfig().getMinInvisibleTimeMillsForRecv()); } - FilterExpression filterExpression = request.getFilterExpression(); - SubscriptionData subscriptionData; - try { - subscriptionData = FilterAPI.build(topic, filterExpression.getExpression(), - GrpcConverter.getInstance().buildExpressionType(filterExpression.getType())); - } catch (Exception e) { - writer.writeAndComplete(ctx, Code.ILLEGAL_FILTER_EXPRESSION, e.getMessage()); - return; - } - + SubscriptionData subscriptionData = GrpcConverter.getInstance().buildSubscriptionData(topic, request.getFilterExpression()); this.messagingProcessor.popMessage( ctx, new ReceiveMessageQueueSelector( diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java index d67f4b855d93..a21b27e2f6de 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java @@ -57,6 +57,7 @@ import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; @@ -286,6 +287,8 @@ public CompletableFuture pullMessage(ProxyContext ctx, MessageQueue long suspendTimeoutMillis, SubscriptionData subscriptionData, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); try { + maxMsgNums = Math.min(ProxyUtils.MAX_MSG_NUMS_FOR_PULL_REQUEST, Math.max(1, maxMsgNums)); + AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService() .buildAddressableMessageQueue(messageQueue); PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); @@ -298,6 +301,7 @@ public CompletableFuture pullMessage(ProxyContext ctx, MessageQueue requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(suspendTimeoutMillis); requestHeader.setSubscription(subscriptionData.getSubString()); + requestHeader.setSubVersion(System.currentTimeMillis()); requestHeader.setExpressionType(subscriptionData.getExpressionType()); future = serviceManager.getMessageService().pullMessage(ctx, addressableMessageQueue, requestHeader, timeoutMillis); } catch (Throwable t) { @@ -307,7 +311,7 @@ public CompletableFuture pullMessage(ProxyContext ctx, MessageQueue } public CompletableFuture updateConsumerOffset(ProxyContext ctx, MessageQueue messageQueue, - String consumerGroup, long commitOffset, long timeoutMillis) { + String consumerGroup, long commitOffset, boolean oneWay, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); try { AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService() @@ -317,7 +321,7 @@ public CompletableFuture updateConsumerOffset(ProxyContext ctx, MessageQue requestHeader.setTopic(addressableMessageQueue.getTopic()); requestHeader.setQueueId(addressableMessageQueue.getQueueId()); requestHeader.setCommitOffset(commitOffset); - future = serviceManager.getMessageService().updateConsumerOffset(ctx, addressableMessageQueue, requestHeader, timeoutMillis); + future = serviceManager.getMessageService().updateConsumerOffset(ctx, addressableMessageQueue, requestHeader, oneWay, timeoutMillis); } catch (Throwable t) { future.completeExceptionally(t); } @@ -420,6 +424,22 @@ public CompletableFuture getMinOffset(ProxyContext ctx, MessageQueue messa return FutureUtils.addExecutor(future, this.executor); } + public CompletableFuture searchOffset(ProxyContext ctx, MessageQueue messageQueue, long timestamp, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + try { + AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService() + .buildAddressableMessageQueue(messageQueue); + SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); + requestHeader.setTopic(addressableMessageQueue.getTopic()); + requestHeader.setQueueId(addressableMessageQueue.getQueueId()); + requestHeader.setTimestamp(timestamp); + future = serviceManager.getMessageService().searchOffset(ctx, addressableMessageQueue, requestHeader, timeoutMillis); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return FutureUtils.addExecutor(future, this.executor); + } + protected Set buildAddressableSet(Set mqSet) { return mqSet.stream().map(mq -> { try { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index b9800d0ccacc..f8604baed37b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -196,8 +196,8 @@ public CompletableFuture pullMessage(ProxyContext ctx, MessageQueue @Override public CompletableFuture updateConsumerOffset(ProxyContext ctx, MessageQueue messageQueue, - String consumerGroup, long commitOffset, long timeoutMillis) { - return this.consumerProcessor.updateConsumerOffset(ctx, messageQueue, consumerGroup, commitOffset, timeoutMillis); + String consumerGroup, long commitOffset, boolean oneWay, long timeoutMillis) { + return this.consumerProcessor.updateConsumerOffset(ctx, messageQueue, consumerGroup, commitOffset, oneWay, timeoutMillis); } @Override @@ -229,6 +229,12 @@ public CompletableFuture getMinOffset(ProxyContext ctx, MessageQueue messa return this.consumerProcessor.getMinOffset(ctx, messageQueue, timeoutMillis); } + @Override + public CompletableFuture searchOffset(ProxyContext ctx, MessageQueue messageQueue, long timestamp, + long timeoutMillis) { + return this.consumerProcessor.searchOffset(ctx, messageQueue, timestamp, timeoutMillis); + } + @Override public CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index 3c4e6303fcd8..c66c3da92f32 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -187,14 +187,33 @@ CompletableFuture pullMessage( long timeoutMillis ); + default CompletableFuture updateConsumerOffset( + ProxyContext ctx, + MessageQueue messageQueue, + String consumerGroup, + long commitOffset, + boolean oneWay + ) { + return updateConsumerOffset(ctx, messageQueue, consumerGroup, commitOffset, oneWay, DEFAULT_TIMEOUT_MILLS); + } + CompletableFuture updateConsumerOffset( ProxyContext ctx, MessageQueue messageQueue, String consumerGroup, long commitOffset, + boolean oneWay, long timeoutMillis ); + default CompletableFuture queryConsumerOffset( + ProxyContext ctx, + MessageQueue messageQueue, + String consumerGroup + ) { + return queryConsumerOffset(ctx, messageQueue, consumerGroup, DEFAULT_TIMEOUT_MILLS); + } + CompletableFuture queryConsumerOffset( ProxyContext ctx, MessageQueue messageQueue, @@ -218,18 +237,47 @@ CompletableFuture unlockBatchMQ( long timeoutMillis ); + default CompletableFuture getMaxOffset( + ProxyContext ctx, + MessageQueue messageQueue + ) { + return getMaxOffset(ctx, messageQueue, DEFAULT_TIMEOUT_MILLS); + } + CompletableFuture getMaxOffset( ProxyContext ctx, MessageQueue messageQueue, long timeoutMillis ); + default CompletableFuture getMinOffset( + ProxyContext ctx, + MessageQueue messageQueue + ) { + return getMinOffset(ctx, messageQueue, DEFAULT_TIMEOUT_MILLS); + } + CompletableFuture getMinOffset( ProxyContext ctx, MessageQueue messageQueue, long timeoutMillis ); + default CompletableFuture searchOffset( + ProxyContext ctx, + MessageQueue messageQueue, + long timestamp + ) { + return searchOffset(ctx, messageQueue, timestamp, DEFAULT_TIMEOUT_MILLS); + } + + CompletableFuture searchOffset( + ProxyContext ctx, + MessageQueue messageQueue, + long timestamp, + long timeoutMillis + ); + CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java index 872b16f511f1..081fcab8e810 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java @@ -46,6 +46,7 @@ import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; @@ -159,10 +160,11 @@ public CompletableFuture queryConsumerOffset(ProxyContext ctx, Addressable @Override public CompletableFuture updateConsumerOffset(ProxyContext ctx, AddressableMessageQueue messageQueue, - UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) { - return this.mqClientAPIFactory.getClient().updateConsumerOffsetOneWay( + UpdateConsumerOffsetRequestHeader requestHeader, boolean oneWay, long timeoutMillis) { + return this.mqClientAPIFactory.getClient().updateConsumerOffset( messageQueue.getBrokerAddr(), requestHeader, + oneWay, timeoutMillis ); } @@ -207,6 +209,16 @@ public CompletableFuture getMinOffset(ProxyContext ctx, AddressableMessage ); } + @Override + public CompletableFuture searchOffset(ProxyContext ctx, AddressableMessageQueue messageQueue, + SearchOffsetRequestHeader requestHeader, long timeoutMillis) { + return this.mqClientAPIFactory.getClient().searchOffset( + messageQueue.getBrokerAddr(), + requestHeader, + timeoutMillis + ); + } + @Override public CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index 115c140ffdcc..2821294234ec 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -68,6 +68,7 @@ import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; @@ -375,7 +376,7 @@ public CompletableFuture queryConsumerOffset(ProxyContext ctx, Addressable @Override public CompletableFuture updateConsumerOffset(ProxyContext ctx, AddressableMessageQueue messageQueue, - UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) { + UpdateConsumerOffsetRequestHeader requestHeader, boolean oneWay, long timeoutMillis) { throw new NotImplementedException("updateConsumerOffset is not implemented in LocalMessageService"); } @@ -403,6 +404,12 @@ public CompletableFuture getMinOffset(ProxyContext ctx, AddressableMessage throw new NotImplementedException("getMinOffset is not implemented in LocalMessageService"); } + @Override + public CompletableFuture searchOffset(ProxyContext ctx, AddressableMessageQueue messageQueue, + SearchOffsetRequestHeader requestHeader, long timeoutMillis) { + throw new NotImplementedException("getMinOffset is not implemented in LocalMessageService"); + } + @Override public CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java index 15da1715402a..7a3454e6e011 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; @@ -109,6 +110,7 @@ CompletableFuture updateConsumerOffset( ProxyContext ctx, AddressableMessageQueue messageQueue, UpdateConsumerOffsetRequestHeader requestHeader, + boolean oneWay, long timeoutMillis ); @@ -140,6 +142,13 @@ CompletableFuture getMinOffset( long timeoutMillis ); + CompletableFuture searchOffset( + ProxyContext ctx, + AddressableMessageQueue messageQueue, + SearchOffsetRequestHeader requestHeader, + long timeoutMillis + ); + CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java index ec81e815cef4..36b8960ba06c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java @@ -406,15 +406,19 @@ public CompletableFuture queryConsumerOffsetWithFuture( return future; } - public CompletableFuture updateConsumerOffsetOneWay( + public CompletableFuture updateConsumerOffset( String brokerAddr, UpdateConsumerOffsetRequestHeader header, + boolean oneWay, long timeoutMillis ) { CompletableFuture future = new CompletableFuture<>(); try { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, header); - this.getRemotingClient().invokeOneway(brokerAddr, request, timeoutMillis); + if (oneWay) { + this.updateConsumerOffsetOneway(brokerAddr, header, timeoutMillis); + } else { + this.updateConsumerOffset(brokerAddr, header, timeoutMillis); + } future.complete(null); } catch (Throwable t) { future.completeExceptionally(t); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index d27139195094..8ded4885b8d1 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -193,12 +193,12 @@ public static String initTopicWithName(String topicName, TopicMessageType topicM } public static String initTopicOnSampleTopicBroker(String topicName, String sampleTopic) { - IntegrationTestBase.initTopic(topicName, NAMESRV_ADDR, sampleTopic, CQType.SimpleCQ); + IntegrationTestBase.initTopic(topicName, NAMESRV_ADDR, sampleTopic, 1, CQType.SimpleCQ); return topicName; } public static String initTopicOnSampleTopicBroker(String topicName, String sampleTopic, TopicMessageType topicMessageType) { - IntegrationTestBase.initTopic(topicName, NAMESRV_ADDR, sampleTopic, topicMessageType); + IntegrationTestBase.initTopic(topicName, NAMESRV_ADDR, sampleTopic, 1, CQType.SimpleCQ, topicMessageType); return topicName; } diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java index 6e3146fa594e..ac187c1c02e3 100644 --- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java @@ -111,4 +111,9 @@ public void testSimpleConsumerToDLQ() throws Exception { public void testConsumeOrderly() throws Exception { super.testConsumeOrderly(); } + + @Test + public void testPullMessage() throws Exception { + super.testPullMessage(); + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java index 243c72dec55f..948e738b4662 100644 --- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java @@ -31,14 +31,21 @@ import apache.rocketmq.v2.EndTransactionRequest; import apache.rocketmq.v2.EndTransactionResponse; import apache.rocketmq.v2.Endpoints; +import apache.rocketmq.v2.GetOffsetRequest; +import apache.rocketmq.v2.GetOffsetResponse; import apache.rocketmq.v2.HeartbeatRequest; import apache.rocketmq.v2.Message; import apache.rocketmq.v2.MessageQueue; import apache.rocketmq.v2.MessageType; import apache.rocketmq.v2.MessagingServiceGrpc; import apache.rocketmq.v2.Publishing; +import apache.rocketmq.v2.PullMessageRequest; +import apache.rocketmq.v2.PullMessageResponse; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryAssignmentResponse; +import apache.rocketmq.v2.QueryOffsetPolicy; +import apache.rocketmq.v2.QueryOffsetRequest; +import apache.rocketmq.v2.QueryOffsetResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; import apache.rocketmq.v2.ReceiveMessageRequest; @@ -54,6 +61,8 @@ import apache.rocketmq.v2.TelemetryCommand; import apache.rocketmq.v2.TransactionResolution; import apache.rocketmq.v2.TransactionSource; +import apache.rocketmq.v2.UpdateOffsetRequest; +import apache.rocketmq.v2.UpdateOffsetResponse; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; import com.google.protobuf.util.Durations; @@ -90,6 +99,7 @@ import java.util.stream.Collectors; import javax.net.ssl.SSLException; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; @@ -540,6 +550,109 @@ public void testConsumeOrderly() throws Exception { } } + public void testPullMessage() throws Exception { + String topic = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.NORMAL); + String group = MQRandomUtils.getRandomConsumerGroup(); + + QueryRouteResponse queryRouteResponse = blockingStub.queryRoute(buildQueryRouteRequest(topic)); + assertThat(queryRouteResponse.getStatus().getCode()).isEqualTo(Code.OK); + MessageQueue messageQueue = queryRouteResponse.getMessageQueues(0); + + this.sendClientSettings(stub, buildPullConsumerClientSettings(group)).get(); + assertThat(queryMinOffset(messageQueue)).isEqualTo(0); + assertThat(queryMaxOffset(messageQueue)).isEqualTo(0); + + String messageId1 = createUniqID(); + SendMessageResponse sendResponse = blockingStub.sendMessage(buildSendMessageRequest(topic, messageId1)); + assertSendMessage(sendResponse, messageId1); + String messageId2 = createUniqID(); + sendResponse = blockingStub.sendMessage(buildSendMessageRequest(topic, messageId2)); + assertSendMessage(sendResponse, messageId2); + + assertThat(queryMinOffset(messageQueue)).isEqualTo(0); + assertThat(queryMaxOffset(messageQueue)).isEqualTo(2); + + List pullMessageResponseList = pullMessage(group, messageQueue, 0); + Pair messageOffsetAndNextOffset = assertPullMessageResponse(pullMessageResponseList, messageId1); + updateOffset(group, messageQueue, messageOffsetAndNextOffset.getLeft()); + assertThat(getOffset(group, messageQueue)).isEqualTo(messageOffsetAndNextOffset.getLeft()); + + pullMessageResponseList = pullMessage(group, messageQueue, messageOffsetAndNextOffset.getRight()); + messageOffsetAndNextOffset = assertPullMessageResponse(pullMessageResponseList, messageId2); + updateOffset(group, messageQueue, messageOffsetAndNextOffset.getLeft()); + assertThat(getOffset(group, messageQueue)).isEqualTo(messageOffsetAndNextOffset.getLeft()); + + pullMessageResponseList = pullMessage(group, messageQueue, messageOffsetAndNextOffset.getRight()); + assertThat(pullMessageResponseList.size()).isEqualTo(2); + assertThat(pullMessageResponseList.get(0).getStatus().getCode()).isEqualTo(Code.MESSAGE_NOT_FOUND); + assertThat(pullMessageResponseList.get(1).getNextOffset()).isEqualTo(messageOffsetAndNextOffset.getRight()); + } + + protected long queryMinOffset(MessageQueue messageQueue) { + QueryOffsetResponse response = blockingStub.withDeadlineAfter(3, TimeUnit.SECONDS) + .queryOffset(QueryOffsetRequest.newBuilder() + .setQueryOffsetPolicy(QueryOffsetPolicy.BEGINNING) + .setMessageQueue(messageQueue) + .build()); + assertThat(response.getStatus().getCode()).isEqualTo(Code.OK); + return response.getOffset(); + } + + protected long queryMaxOffset(MessageQueue messageQueue) { + QueryOffsetResponse response = blockingStub.withDeadlineAfter(3, TimeUnit.SECONDS) + .queryOffset(QueryOffsetRequest.newBuilder() + .setQueryOffsetPolicy(QueryOffsetPolicy.END) + .setMessageQueue(messageQueue) + .build()); + assertThat(response.getStatus().getCode()).isEqualTo(Code.OK); + return response.getOffset(); + } + + protected void updateOffset(String group, MessageQueue messageQueue, long offset) { + UpdateOffsetResponse response = blockingStub.withDeadlineAfter(3, TimeUnit.SECONDS) + .updateOffset(UpdateOffsetRequest.newBuilder() + .setGroup(Resource.newBuilder().setName(group).build()) + .setMessageQueue(messageQueue) + .setOffset(offset) + .build()); + assertThat(response.getStatus().getCode()).isEqualTo(Code.OK); + } + + protected long getOffset(String group, MessageQueue messageQueue) { + GetOffsetResponse response = blockingStub.withDeadlineAfter(3, TimeUnit.SECONDS) + .getOffset(GetOffsetRequest.newBuilder() + .setGroup(Resource.newBuilder().setName(group).build()) + .setMessageQueue(messageQueue) + .build()); + assertThat(response.getStatus().getCode()).isEqualTo(Code.OK); + return response.getOffset(); + } + + protected List pullMessage(String group, MessageQueue messageQueue, long offset) { + List responseList = new ArrayList<>(); + Iterator responseIterator = blockingStub.withDeadlineAfter(3, TimeUnit.SECONDS) + .pullMessage(PullMessageRequest.newBuilder() + .setGroup(Resource.newBuilder() + .setName(group) + .build()) + .setMessageQueue(messageQueue) + .setOffset(offset) + .setBatchSize(1) + .build()); + while (responseIterator.hasNext()) { + responseList.add(responseIterator.next()); + } + return responseList; + } + + protected Pair assertPullMessageResponse(List pullMessageResponseList, String messageId) { + assertThat(pullMessageResponseList.size()).isEqualTo(3); + assertThat(pullMessageResponseList.get(0).getStatus().getCode()).isEqualTo(Code.OK); + Message message = pullMessageResponseList.get(1).getMessage(); + assertThat(pullMessageResponseList.get(1).getMessage().getSystemProperties().getMessageId()).isEqualTo(messageId); + return Pair.of(message.getSystemProperties().getQueueOffset(), pullMessageResponseList.get(2).getNextOffset()); + } + public List receiveMessage(MessagingServiceGrpc.MessagingServiceBlockingStub stub, String topic, String group) { return receiveMessage(stub, topic, group, 15); @@ -762,6 +875,16 @@ public Endpoints buildEndpoints(int port) { .build(); } + public Settings buildPullConsumerClientSettings(String group) { + return Settings.newBuilder() + .setClientType(ClientType.PULL_CONSUMER) + .setRequestTimeout(Durations.fromSeconds(3)) + .setSubscription(Subscription.newBuilder() + .setGroup(Resource.newBuilder().setName(group).build()) + .build()) + .build(); + } + public Settings buildSimpleConsumerClientSettings(String group) { return Settings.newBuilder() .setClientType(ClientType.SIMPLE_CONSUMER)