Skip to content

Commit

Permalink
Fix hasMessageAvailable return true but can't read message (apache#10414
Browse files Browse the repository at this point in the history
)

### Motivation

I temporarily fixed this problem in PR apache#10190.
Now we have found a better way, this way can avoid the seek, then avoid trigger another reconnection.
Thank you @codelipenghui  to troubleshoot this issue with me all night.

We have added a lot of log and found that this issue is caused by some race condition problems. Here is the first reason:
https://github.com/apache/pulsar/blob/f2d72c9fc13a33df584ec1bd96a4c147774b858d/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1808-L1818
Now we have an acknowledgmentsGroupingTracker to filter duplicate messages, and this Tracker will be cleaned up after seek.

However, it is possible that the connection is ready and Broker has pushed message, but `acknowledgmentsGroupingTracker.flushAndClean(); ` has not been executed yet. 

Finally hasMessageAvailableAsync returns true, but the message cannot be read because it is filtered by the acknowledgmentsGroupingTracker


### Modifications
clean the tracker when connection was open

### Verifying this change
  • Loading branch information
315157973 authored May 4, 2021
1 parent 76c93a3 commit f69a03b
Showing 1 changed file with 4 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,9 @@ public void connectionOpened(final ClientCnx cnx) {
log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}", topic, subscription, cnx.ctx().channel(), consumerId);

long requestId = client.newRequestId();
if (duringSeek.get()) {
acknowledgmentsGroupingTracker.flushAndClean();
}

int currentSize;
synchronized (this) {
Expand Down Expand Up @@ -1927,22 +1930,7 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
}

if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) {
//this situation will occur when :
// 1.We haven't read yet 2.The connection was reset multiple times
// 3.Broker has pushed messages to ReceiverQueue, but messages were cleaned due to connection reset
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMax(2000, TimeUnit.MILLISECONDS)
.setMandatoryStop(client.getConfiguration().getOperationTimeoutMs(), TimeUnit.MILLISECONDS)
.create();
RetryUtil.retryAsynchronously(() -> {
try {
seek(startMessageId);
return true;
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}, backoff, pinnedExecutor, booleanFuture);
booleanFuture.complete(true);
return booleanFuture;
}

Expand Down

0 comments on commit f69a03b

Please sign in to comment.