Skip to content

Commit

Permalink
[ISSUE #6756] Implement pull APIs for proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
xdkxlk committed May 15, 2023
1 parent d9a7315 commit 9ad5975
Show file tree
Hide file tree
Showing 24 changed files with 714 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ public static PlainAccessResource parse(GeneratedMessageV3 messageV3, Authentica
if (HeartbeatRequest.getDescriptor().getFullName().equals(rpcFullName)) {
HeartbeatRequest request = (HeartbeatRequest) messageV3;
if (ClientType.PUSH_CONSUMER.equals(request.getClientType())
|| ClientType.SIMPLE_CONSUMER.equals(request.getClientType())) {
|| ClientType.SIMPLE_CONSUMER.equals(request.getClientType())
|| ClientType.PULL_CONSUMER.equals(request.getClientType())) {
if (!request.hasGroup()) {
throw new AclException("Consumer heartbeat doesn't have group");
} else {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
<annotations-api.version>6.0.53</annotations-api.version>
<extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
<concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
<rocketmq-proto.version>2.0.2</rocketmq-proto.version>
<rocketmq-proto.version>2.0.3-SNAPSHOT</rocketmq-proto.version>
<grpc.version>1.50.0</grpc.version>
<protobuf.version>3.20.1</protobuf.version>
<disruptor.version>1.2.10</disruptor.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public class ProxyUtils {

public static final int MAX_MSG_NUMS_FOR_POP_REQUEST = 32;
public static final int MAX_MSG_NUMS_FOR_PULL_REQUEST = 32;

public static final String BROKER_ADDR = "brokerAddr";
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,27 @@
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.GetOffsetResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.PullMessageResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryOffsetRequest;
import apache.rocketmq.v2.QueryOffsetResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.UpdateOffsetRequest;
import apache.rocketmq.v2.UpdateOffsetResponse;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
Expand All @@ -49,6 +57,7 @@
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.consumer.AckMessageActivity;
import org.apache.rocketmq.proxy.grpc.v2.consumer.ChangeInvisibleDurationActivity;
import org.apache.rocketmq.proxy.grpc.v2.consumer.PullMessageActivity;
import org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity;
import org.apache.rocketmq.proxy.grpc.v2.producer.ForwardMessageToDLQActivity;
import org.apache.rocketmq.proxy.grpc.v2.producer.SendMessageActivity;
Expand All @@ -64,6 +73,7 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme
protected GrpcChannelManager grpcChannelManager;
protected ReceiptHandleProcessor receiptHandleProcessor;
protected ReceiveMessageActivity receiveMessageActivity;
protected PullMessageActivity pullMessageActivity;
protected AckMessageActivity ackMessageActivity;
protected ChangeInvisibleDurationActivity changeInvisibleDurationActivity;
protected SendMessageActivity sendMessageActivity;
Expand All @@ -82,6 +92,7 @@ protected void init(MessagingProcessor messagingProcessor) {
this.receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor);

this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
this.pullMessageActivity = new PullMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
this.ackMessageActivity = new AckMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
this.sendMessageActivity = new SendMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
Expand Down Expand Up @@ -149,6 +160,27 @@ public CompletableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuratio
return this.changeInvisibleDurationActivity.changeInvisibleDuration(ctx, request);
}

@Override
public void pullMessage(ProxyContext ctx, PullMessageRequest request,
StreamObserver<PullMessageResponse> responseObserver) {
this.pullMessageActivity.pullMessage(ctx, request, responseObserver);
}

@Override
public CompletableFuture<UpdateOffsetResponse> updateOffset(ProxyContext ctx, UpdateOffsetRequest request) {
return this.pullMessageActivity.updateOffset(ctx, request);
}

@Override
public CompletableFuture<GetOffsetResponse> getOffset(ProxyContext ctx, GetOffsetRequest request) {
return this.pullMessageActivity.getOffset(ctx, request);
}

@Override
public CompletableFuture<QueryOffsetResponse> queryOffset(ProxyContext ctx, QueryOffsetRequest request) {
return this.pullMessageActivity.queryOffset(ctx, request);
}

@Override
public StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx,
StreamObserver<TelemetryCommand> responseObserver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.GetOffsetResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.MessagingServiceGrpc;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.PullMessageResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryOffsetRequest;
import apache.rocketmq.v2.QueryOffsetResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
Expand All @@ -41,6 +47,8 @@
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.Status;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.UpdateOffsetRequest;
import apache.rocketmq.v2.UpdateOffsetResponse;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -373,6 +381,77 @@ public void changeInvisibleDuration(ChangeInvisibleDurationRequest request,
}
}

@Override
public void pullMessage(PullMessageRequest request, StreamObserver<PullMessageResponse> responseObserver) {
Function<Status, PullMessageResponse> statusResponseCreator = status -> PullMessageResponse.newBuilder().setStatus(status).build();
ProxyContext context = createContext();
try {
validateContext(context);
this.addExecutor(this.consumerThreadPoolExecutor,
context,
request,
() -> grpcMessingActivity.pullMessage(context, request, responseObserver),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
writeResponse(context, request, null, responseObserver, t, statusResponseCreator);
}
}

@Override
public void updateOffset(UpdateOffsetRequest request, StreamObserver<UpdateOffsetResponse> responseObserver) {
Function<Status, UpdateOffsetResponse> statusResponseCreator = status -> UpdateOffsetResponse.newBuilder().setStatus(status).build();
ProxyContext context = createContext();
try {
validateContext(context);
this.addExecutor(this.consumerThreadPoolExecutor,
context,
request,
() -> grpcMessingActivity.updateOffset(context, request)
.whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
writeResponse(context, request, null, responseObserver, t, statusResponseCreator);
}
}

@Override
public void getOffset(GetOffsetRequest request, StreamObserver<GetOffsetResponse> responseObserver) {
Function<Status, GetOffsetResponse> statusResponseCreator = status -> GetOffsetResponse.newBuilder().setStatus(status).build();
ProxyContext context = createContext();
try {
validateContext(context);
this.addExecutor(this.consumerThreadPoolExecutor,
context,
request,
() -> grpcMessingActivity.getOffset(context, request)
.whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
writeResponse(context, request, null, responseObserver, t, statusResponseCreator);
}
}

@Override
public void queryOffset(QueryOffsetRequest request, StreamObserver<QueryOffsetResponse> responseObserver) {
Function<Status, QueryOffsetResponse> statusResponseCreator = status -> QueryOffsetResponse.newBuilder().setStatus(status).build();
ProxyContext context = createContext();
try {
validateContext(context);
this.addExecutor(this.consumerThreadPoolExecutor,
context,
request,
() -> grpcMessingActivity.queryOffset(context, request)
.whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
writeResponse(context, request, null, responseObserver, t, statusResponseCreator);
}
}

@Override
public StreamObserver<TelemetryCommand> telemetry(StreamObserver<TelemetryCommand> responseObserver) {
Function<Status, TelemetryCommand> statusResponseCreator = status -> TelemetryCommand.newBuilder().setStatus(status).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.GetOffsetResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.PullMessageResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryOffsetRequest;
import apache.rocketmq.v2.QueryOffsetResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.UpdateOffsetRequest;
import apache.rocketmq.v2.UpdateOffsetResponse;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.proxy.common.ProxyContext;
Expand Down Expand Up @@ -69,5 +77,14 @@ CompletableFuture<NotifyClientTerminationResponse> notifyClientTermination(Proxy
CompletableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(ProxyContext ctx,
ChangeInvisibleDurationRequest request);

void pullMessage(ProxyContext ctx, PullMessageRequest request,
StreamObserver<PullMessageResponse> responseObserver);

CompletableFuture<UpdateOffsetResponse> updateOffset(ProxyContext ctx, UpdateOffsetRequest request);

CompletableFuture<GetOffsetResponse> getOffset(ProxyContext ctx, GetOffsetRequest request);

CompletableFuture<QueryOffsetResponse> queryOffset(ProxyContext ctx, QueryOffsetRequest request);

StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx, StreamObserver<TelemetryCommand> responseObserver);
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public CompletableFuture<HeartbeatResponse> heartbeat(ProxyContext ctx, Heartbea
}
break;
}
case PULL_CONSUMER:
case PUSH_CONSUMER:
case SIMPLE_CONSUMER: {
validateConsumerGroup(request.getGroup());
Expand Down Expand Up @@ -149,6 +150,7 @@ public CompletableFuture<NotifyClientTerminationResponse> notifyClientTerminatio
}
}
break;
case PULL_CONSUMER:
case PUSH_CONSUMER:
case SIMPLE_CONSUMER:
validateConsumerGroup(request.getGroup());
Expand Down Expand Up @@ -385,9 +387,11 @@ protected ConsumeMessageDirectlyResult buildConsumeMessageDirectlyResult(Status
protected ConsumeType buildConsumeType(ClientType clientType) {
switch (clientType) {
case SIMPLE_CONSUMER:
return ConsumeType.CONSUME_ACTIVELY;
return ConsumeType.CONSUME_POP;
case PUSH_CONSUMER:
return ConsumeType.CONSUME_PASSIVELY;
case PULL_CONSUMER:
return ConsumeType.CONSUME_ACTIVELY;
default:
throw new IllegalArgumentException("Client type is not consumer, type: " + clientType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import apache.rocketmq.v2.Address;
import apache.rocketmq.v2.AddressScheme;
import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.CustomizedBackoff;
import apache.rocketmq.v2.Endpoints;
import apache.rocketmq.v2.ExponentialBackoff;
Expand Down Expand Up @@ -233,7 +232,7 @@ protected void onWaitEnd() {
for (String clientId : clientIdSet) {
try {
CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, settings) -> {
if (!settings.getClientType().equals(ClientType.PUSH_CONSUMER) && !settings.getClientType().equals(ClientType.SIMPLE_CONSUMER)) {
if (!GrpcValidator.getInstance().isConsumer(settings.getClientType())) {
return settings;
}
String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.rocketmq.proxy.grpc.v2.common;

import apache.rocketmq.v2.Broker;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.DeadLetterQueue;
import apache.rocketmq.v2.Digest;
import apache.rocketmq.v2.DigestType;
import apache.rocketmq.v2.Encoding;
import apache.rocketmq.v2.FilterExpression;
import apache.rocketmq.v2.FilterType;
import apache.rocketmq.v2.Message;
import apache.rocketmq.v2.MessageQueue;
Expand All @@ -46,6 +48,8 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;

public class GrpcConverter {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
Expand Down Expand Up @@ -86,6 +90,15 @@ public MessageQueue buildMessageQueue(MessageExt messageExt, String brokerName)
.build();
}

public SubscriptionData buildSubscriptionData(String rocketmqTopic, FilterExpression filterExpression) {
try {
return FilterAPI.build(rocketmqTopic, filterExpression.getExpression(),
GrpcConverter.getInstance().buildExpressionType(filterExpression.getType()));
} catch (Exception e) {
throw new GrpcProxyException(Code.ILLEGAL_FILTER_EXPRESSION, e.getMessage());
}
}

public String buildExpressionType(FilterType filterType) {
switch (filterType) {
case SQL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.proxy.grpc.v2.common;

import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.Resource;
import com.google.common.base.CharMatcher;
Expand All @@ -29,10 +30,12 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;

public class GrpcValidator {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

protected static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3";
protected static final Object INSTANCE_CREATE_LOCK = new Object();
protected static volatile GrpcValidator instance;

Expand All @@ -47,6 +50,36 @@ public static GrpcValidator getInstance() {
return instance;
}

public boolean isConsumer(ClientType clientType) {
return ClientType.SIMPLE_CONSUMER.equals(clientType) ||
ClientType.PUSH_CONSUMER.equals(clientType) ||
ClientType.PULL_CONSUMER.equals(clientType);
}

public long reasonableLongPollingTimeout(Long longPollingTimeout, Long timeRemaining, String clientVersion) {
long pollingTime = longPollingTimeout;
ProxyConfig config = ConfigurationManager.getProxyConfig();
if (pollingTime < config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
pollingTime = config.getGrpcClientConsumerMinLongPollingTimeoutMillis();
}
if (pollingTime > config.getGrpcClientConsumerMaxLongPollingTimeoutMillis()) {
pollingTime = config.getGrpcClientConsumerMaxLongPollingTimeoutMillis();
}

if (pollingTime > timeRemaining) {
if (timeRemaining >= config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
pollingTime = timeRemaining;
} else {
Code code =
null == clientVersion || ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION.compareTo(clientVersion) > 0 ?
Code.BAD_REQUEST : Code.ILLEGAL_POLLING_TIME;
throw new GrpcProxyException(code, "The deadline time remaining is not enough" +
" for polling, please check network condition");
}
}
return pollingTime;
}

public void validateTopic(Resource topic) {
validateTopic(GrpcConverter.getInstance().wrapResourceWithNamespace(topic));
}
Expand Down
Loading

0 comments on commit 9ad5975

Please sign in to comment.