Skip to content

Commit

Permalink
[ISSUE apache#6756] Implement pull APIs for proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
xdkxlk committed May 15, 2023
1 parent 5dc2e20 commit 5a8b986
Show file tree
Hide file tree
Showing 24 changed files with 714 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ public static PlainAccessResource parse(GeneratedMessageV3 messageV3, Authentica
if (HeartbeatRequest.getDescriptor().getFullName().equals(rpcFullName)) {
HeartbeatRequest request = (HeartbeatRequest) messageV3;
if (ClientType.PUSH_CONSUMER.equals(request.getClientType())
|| ClientType.SIMPLE_CONSUMER.equals(request.getClientType())) {
|| ClientType.SIMPLE_CONSUMER.equals(request.getClientType())
|| ClientType.PULL_CONSUMER.equals(request.getClientType())) {
if (!request.hasGroup()) {
throw new AclException("Consumer heartbeat doesn't have group");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,15 +410,19 @@ public CompletableFuture<Long> queryConsumerOffsetWithFuture(
return future;
}

public CompletableFuture<Void> updateConsumerOffsetOneWay(
public CompletableFuture<Void> updateConsumerOffset(
String brokerAddr,
UpdateConsumerOffsetRequestHeader header,
boolean oneWay,
long timeoutMillis
) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, header);
this.getRemotingClient().invokeOneway(brokerAddr, request, timeoutMillis);
if (oneWay) {
this.updateConsumerOffsetOneway(brokerAddr, header, timeoutMillis);
} else {
this.updateConsumerOffset(brokerAddr, header, timeoutMillis);
}
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
<annotations-api.version>6.0.53</annotations-api.version>
<extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
<concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
<rocketmq-proto.version>2.0.2</rocketmq-proto.version>
<rocketmq-proto.version>2.0.3-SNAPSHOT</rocketmq-proto.version>
<grpc.version>1.50.0</grpc.version>
<protobuf.version>3.20.1</protobuf.version>
<disruptor.version>1.2.10</disruptor.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public class ProxyUtils {

public static final int MAX_MSG_NUMS_FOR_POP_REQUEST = 32;
public static final int MAX_MSG_NUMS_FOR_PULL_REQUEST = 32;

public static final String BROKER_ADDR = "brokerAddr";
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,27 @@
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.GetOffsetResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.PullMessageResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryOffsetRequest;
import apache.rocketmq.v2.QueryOffsetResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.UpdateOffsetRequest;
import apache.rocketmq.v2.UpdateOffsetResponse;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
Expand All @@ -49,6 +57,7 @@
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.consumer.AckMessageActivity;
import org.apache.rocketmq.proxy.grpc.v2.consumer.ChangeInvisibleDurationActivity;
import org.apache.rocketmq.proxy.grpc.v2.consumer.PullMessageActivity;
import org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity;
import org.apache.rocketmq.proxy.grpc.v2.producer.ForwardMessageToDLQActivity;
import org.apache.rocketmq.proxy.grpc.v2.producer.SendMessageActivity;
Expand All @@ -64,6 +73,7 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme
protected GrpcChannelManager grpcChannelManager;
protected ReceiptHandleProcessor receiptHandleProcessor;
protected ReceiveMessageActivity receiveMessageActivity;
protected PullMessageActivity pullMessageActivity;
protected AckMessageActivity ackMessageActivity;
protected ChangeInvisibleDurationActivity changeInvisibleDurationActivity;
protected SendMessageActivity sendMessageActivity;
Expand All @@ -82,6 +92,7 @@ protected void init(MessagingProcessor messagingProcessor) {
this.receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor);

this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
this.pullMessageActivity = new PullMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
this.ackMessageActivity = new AckMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
this.sendMessageActivity = new SendMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
Expand Down Expand Up @@ -149,6 +160,27 @@ public CompletableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuratio
return this.changeInvisibleDurationActivity.changeInvisibleDuration(ctx, request);
}

@Override
public void pullMessage(ProxyContext ctx, PullMessageRequest request,
StreamObserver<PullMessageResponse> responseObserver) {
this.pullMessageActivity.pullMessage(ctx, request, responseObserver);
}

@Override
public CompletableFuture<UpdateOffsetResponse> updateOffset(ProxyContext ctx, UpdateOffsetRequest request) {
return this.pullMessageActivity.updateOffset(ctx, request);
}

@Override
public CompletableFuture<GetOffsetResponse> getOffset(ProxyContext ctx, GetOffsetRequest request) {
return this.pullMessageActivity.getOffset(ctx, request);
}

@Override
public CompletableFuture<QueryOffsetResponse> queryOffset(ProxyContext ctx, QueryOffsetRequest request) {
return this.pullMessageActivity.queryOffset(ctx, request);
}

@Override
public StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx,
StreamObserver<TelemetryCommand> responseObserver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.GetOffsetResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.MessagingServiceGrpc;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.PullMessageResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryOffsetRequest;
import apache.rocketmq.v2.QueryOffsetResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
Expand All @@ -41,6 +47,8 @@
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.Status;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.UpdateOffsetRequest;
import apache.rocketmq.v2.UpdateOffsetResponse;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -375,6 +383,77 @@ public void changeInvisibleDuration(ChangeInvisibleDurationRequest request,
}
}

@Override
public void pullMessage(PullMessageRequest request, StreamObserver<PullMessageResponse> responseObserver) {
Function<Status, PullMessageResponse> statusResponseCreator = status -> PullMessageResponse.newBuilder().setStatus(status).build();
ProxyContext context = createContext();
try {
validateContext(context);
this.addExecutor(this.consumerThreadPoolExecutor,
context,
request,
() -> grpcMessingActivity.pullMessage(context, request, responseObserver),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
writeResponse(context, request, null, responseObserver, t, statusResponseCreator);
}
}

@Override
public void updateOffset(UpdateOffsetRequest request, StreamObserver<UpdateOffsetResponse> responseObserver) {
Function<Status, UpdateOffsetResponse> statusResponseCreator = status -> UpdateOffsetResponse.newBuilder().setStatus(status).build();
ProxyContext context = createContext();
try {
validateContext(context);
this.addExecutor(this.consumerThreadPoolExecutor,
context,
request,
() -> grpcMessingActivity.updateOffset(context, request)
.whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
writeResponse(context, request, null, responseObserver, t, statusResponseCreator);
}
}

@Override
public void getOffset(GetOffsetRequest request, StreamObserver<GetOffsetResponse> responseObserver) {
Function<Status, GetOffsetResponse> statusResponseCreator = status -> GetOffsetResponse.newBuilder().setStatus(status).build();
ProxyContext context = createContext();
try {
validateContext(context);
this.addExecutor(this.consumerThreadPoolExecutor,
context,
request,
() -> grpcMessingActivity.getOffset(context, request)
.whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
writeResponse(context, request, null, responseObserver, t, statusResponseCreator);
}
}

@Override
public void queryOffset(QueryOffsetRequest request, StreamObserver<QueryOffsetResponse> responseObserver) {
Function<Status, QueryOffsetResponse> statusResponseCreator = status -> QueryOffsetResponse.newBuilder().setStatus(status).build();
ProxyContext context = createContext();
try {
validateContext(context);
this.addExecutor(this.consumerThreadPoolExecutor,
context,
request,
() -> grpcMessingActivity.queryOffset(context, request)
.whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
writeResponse(context, request, null, responseObserver, t, statusResponseCreator);
}
}

@Override
public StreamObserver<TelemetryCommand> telemetry(StreamObserver<TelemetryCommand> responseObserver) {
Function<Status, TelemetryCommand> statusResponseCreator = status -> TelemetryCommand.newBuilder().setStatus(status).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.GetOffsetResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.PullMessageResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryOffsetRequest;
import apache.rocketmq.v2.QueryOffsetResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.UpdateOffsetRequest;
import apache.rocketmq.v2.UpdateOffsetResponse;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.proxy.common.ProxyContext;
Expand Down Expand Up @@ -69,5 +77,14 @@ CompletableFuture<NotifyClientTerminationResponse> notifyClientTermination(Proxy
CompletableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(ProxyContext ctx,
ChangeInvisibleDurationRequest request);

void pullMessage(ProxyContext ctx, PullMessageRequest request,
StreamObserver<PullMessageResponse> responseObserver);

CompletableFuture<UpdateOffsetResponse> updateOffset(ProxyContext ctx, UpdateOffsetRequest request);

CompletableFuture<GetOffsetResponse> getOffset(ProxyContext ctx, GetOffsetRequest request);

CompletableFuture<QueryOffsetResponse> queryOffset(ProxyContext ctx, QueryOffsetRequest request);

StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx, StreamObserver<TelemetryCommand> responseObserver);
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public CompletableFuture<HeartbeatResponse> heartbeat(ProxyContext ctx, Heartbea
}
break;
}
case PULL_CONSUMER:
case PUSH_CONSUMER:
case SIMPLE_CONSUMER: {
validateConsumerGroup(request.getGroup());
Expand Down Expand Up @@ -149,6 +150,7 @@ public CompletableFuture<NotifyClientTerminationResponse> notifyClientTerminatio
}
}
break;
case PULL_CONSUMER:
case PUSH_CONSUMER:
case SIMPLE_CONSUMER:
validateConsumerGroup(request.getGroup());
Expand Down Expand Up @@ -385,9 +387,11 @@ protected ConsumeMessageDirectlyResult buildConsumeMessageDirectlyResult(Status
protected ConsumeType buildConsumeType(ClientType clientType) {
switch (clientType) {
case SIMPLE_CONSUMER:
return ConsumeType.CONSUME_ACTIVELY;
return ConsumeType.CONSUME_POP;
case PUSH_CONSUMER:
return ConsumeType.CONSUME_PASSIVELY;
case PULL_CONSUMER:
return ConsumeType.CONSUME_ACTIVELY;
default:
throw new IllegalArgumentException("Client type is not consumer, type: " + clientType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import apache.rocketmq.v2.Address;
import apache.rocketmq.v2.AddressScheme;
import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.CustomizedBackoff;
import apache.rocketmq.v2.Endpoints;
import apache.rocketmq.v2.ExponentialBackoff;
Expand Down Expand Up @@ -233,7 +232,7 @@ protected void onWaitEnd() {
for (String clientId : clientIdSet) {
try {
CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, settings) -> {
if (!settings.getClientType().equals(ClientType.PUSH_CONSUMER) && !settings.getClientType().equals(ClientType.SIMPLE_CONSUMER)) {
if (!GrpcValidator.getInstance().isConsumer(settings.getClientType())) {
return settings;
}
String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.rocketmq.proxy.grpc.v2.common;

import apache.rocketmq.v2.Broker;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.DeadLetterQueue;
import apache.rocketmq.v2.Digest;
import apache.rocketmq.v2.DigestType;
import apache.rocketmq.v2.Encoding;
import apache.rocketmq.v2.FilterExpression;
import apache.rocketmq.v2.FilterType;
import apache.rocketmq.v2.Message;
import apache.rocketmq.v2.MessageQueue;
Expand All @@ -46,6 +48,8 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;

public class GrpcConverter {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
Expand Down Expand Up @@ -86,6 +90,15 @@ public MessageQueue buildMessageQueue(MessageExt messageExt, String brokerName)
.build();
}

public SubscriptionData buildSubscriptionData(String rocketmqTopic, FilterExpression filterExpression) {
try {
return FilterAPI.build(rocketmqTopic, filterExpression.getExpression(),
GrpcConverter.getInstance().buildExpressionType(filterExpression.getType()));
} catch (Exception e) {
throw new GrpcProxyException(Code.ILLEGAL_FILTER_EXPRESSION, e.getMessage());
}
}

public String buildExpressionType(FilterType filterType) {
switch (filterType) {
case SQL:
Expand Down
Loading

0 comments on commit 5a8b986

Please sign in to comment.