Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump rocketmq-proto to 2.0.2 #364

Merged
merged 1 commit into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,19 @@ private apache.rocketmq.v2.FilterExpression wrapFilterExpression(FilterExpressio
}

ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,
FilterExpression filterExpression) {
FilterExpression filterExpression, Duration longPollingTimeout) {
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
.setBatchSize(batchSize).setAutoRenew(true).build();
}

ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,
FilterExpression filterExpression, Duration invisibleDuration) {
FilterExpression filterExpression, Duration invisibleDuration, Duration longPollingTimeout) {
final com.google.protobuf.Duration duration = Durations.fromNanos(invisibleDuration.toNanos());
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
.setBatchSize(batchSize).setAutoRenew(false).setInvisibleDuration(duration).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,17 @@ private void receiveMessageImmediately() {
try {
final Endpoints endpoints = mq.getBroker().getEndpoints();
final int batchSize = this.getReceptionBatchSize();
final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);
final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();
final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
longPollingTimeout);
activityNanoTime = System.nanoTime();

// Intercept before message reception.
final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
consumer.doBefore(context, Collections.emptyList());

final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
consumer.getPushConsumerSettings().getLongPollingTimeout());
longPollingTimeout);
Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
@Override
public void onSuccess(ReceiveMessageResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration
final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
final MessageQueueImpl mq = result.takeMessageQueue();
final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
invisibleDuration);
invisibleDuration, awaitDuration);
return receiveMessage(request, mq, awaitDuration);
}, MoreExecutors.directExecutor());
return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testReceiveMessageImmediately() {
when(pushSubscriptionSettings.getReceiveBatchSize()).thenReturn(32);
ReceiveMessageRequest request = ReceiveMessageRequest.newBuilder().build();
when(pushConsumer.wrapReceiveMessageRequest(anyInt(), any(MessageQueueImpl.class),
any(FilterExpression.class))).thenReturn(request);
any(FilterExpression.class), any(Duration.class))).thenReturn(request);
processQueue.fetchMessageImmediately();
await().atMost(Duration.ofSeconds(3))
.untilAsserted(() -> verify(pushConsumer, times(cachedMessagesCountThresholdPerQueue))
Expand Down
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
~ 1. Whether it is essential, because the current shaded jar is fat enough.
~ 2. Make sure that it is compatible with Java 8.
-->
<rocketmq-proto.version>2.0.1</rocketmq-proto.version>
<rocketmq-proto.version>2.0.2</rocketmq-proto.version>
<annotations-api.version>6.0.53</annotations-api.version>
<protobuf.version>3.21.7</protobuf.version>
<grpc.version>1.50.0</grpc.version>
Expand Down