Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure #19031

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

BewareMyPower
Copy link
Contributor

Fixes #19030

Motivation

When a BatchMessageIdImpl is created from a deserialization, the BatchMessageAcker object cannot be shared by all instances in the same batch, which leads to an acknowledgment failure when batch index ACK is disabled (by default).

Modifications

Maintain a map from the (ledger id, entry id) pair to the BatchMessageAcker in ConsumerImpl. If the BatchMessageIdImpl doesn't carry a valid BatchMessageAcker, create and cache a BatchMessageAcker instance and remove it when all messages in the batch are acknowledged.

It requires a change in MessageIdImpl#fromByteArray that a BatchMessageAckerDisabled will be created to indicate there is no shared acker.

To avoid making code more complicated, this patch refactors the existing code that many logics about consumer are moved from the ACK tracker to the consumer. It also removes the AckType parameter when acknowledging a list of messages.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: BewareMyPower#15

@BewareMyPower BewareMyPower self-assigned this Dec 22, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 22, 2022
@BewareMyPower BewareMyPower added type/bug The PR fixed a bug or issue reported a bug area/client labels Dec 22, 2022
@BewareMyPower BewareMyPower force-pushed the bewaremypower/message-id-deserialization-fix branch from 329e6ac to ba52f1e Compare December 22, 2022 15:48
@BewareMyPower
Copy link
Contributor Author

More explanations to the refactoring parts of this PR:

  1. I don't want to handle the list cumulative acknowledgment. Pulsar never supports this operation. There is no acknowledgeCumulative API that accepts a list of messages.
  2. Process the BatchMessageIdImpl in ConsumerImpl. The previous code processes it mainly in PersistentAcknowledgmentTracker, it appearly does not follow the OOP style and there are a lot of consumer.getXXX() calls. I don't want to write more code based on that, e.g. using consumer.getBatchMessageToAcker() everywhere.

Fixes apache#19030

### Motivation

When a `BatchMessageIdImpl` is created from a deserialization, the
`BatchMessageAcker` object cannot be shared by all instances in the same
batch, which leads to an acknowledgment failure when batch index ACK is
disabled (by default).

### Modifications

Maintain a map from the `(ledger id, entry id)` pair to the
`BatchMessageAcker` in `ConsumerImpl`. If the `BatchMessageIdImpl`
doesn't carry a valid `BatchMessageAcker`, create and cache a
`BatchMessageAcker` instance and remove it when all messages in the
batch are acknowledged.

It requires a change in `MessageIdImpl#fromByteArray` that a
`BatchMessageAckerDisabled` will be created to indicate there is no
shared acker.

To avoid making code more complicated, this patch refactors the existing
code that many logics about consumer are moved from the ACK tracker to
the consumer. It also removes the `AckType` parameter when acknowledging
a list of messages.
@BewareMyPower BewareMyPower force-pushed the bewaremypower/message-id-deserialization-fix branch from ba52f1e to 1eb278d Compare December 23, 2022 03:18
@BewareMyPower
Copy link
Contributor Author

Thanks for your reviews, I'm going to address the comments soon.

@BewareMyPower
Copy link
Contributor Author

@congbobo184 You comments are addressed now, PTAL again.

@@ -267,10 +194,9 @@ private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<Str
}


private CompletableFuture<Void> doIndividualAckAsync(MessageIdImpl messageId) {
private void doIndividualAckAsync(MessageIdImpl messageId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can an Async variant be void? Especially doIndividualBatchAck below returns CompletableFuture<Void>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current names of these two "Async" methods are confusing. I will rename them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this PR already includes some refactorings, I might do that in a following PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BewareMyPower Yeah. That sounds good.

Co-authored-by: tison <wander4096@gmail.com>
@codecov-commenter
Copy link

codecov-commenter commented Dec 28, 2022

Codecov Report

Merging #19031 (15774bc) into master (492a9c3) will decrease coverage by 0.27%.
The diff coverage is 16.66%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #19031      +/-   ##
============================================
- Coverage     47.46%   47.18%   -0.28%     
+ Complexity    10727    10645      -82     
============================================
  Files           711      712       +1     
  Lines         69456    69492      +36     
  Branches       7452     7460       +8     
============================================
- Hits          32964    32789     -175     
- Misses        32810    33021     +211     
  Partials       3682     3682              
Flag Coverage Δ
unittests 47.18% <16.66%> (-0.28%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ar/client/impl/AcknowledgmentsGroupingTracker.java 0.00% <0.00%> (ø)
...va/org/apache/pulsar/client/impl/ConsumerBase.java 21.51% <0.00%> (ø)
...he/pulsar/client/impl/MultiTopicsConsumerImpl.java 22.80% <0.00%> (ø)
...pl/NonPersistentAcknowledgmentGroupingTracker.java 33.33% <0.00%> (-4.17%) ⬇️
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 14.50% <1.31%> (-0.60%) ⬇️
...impl/PersistentAcknowledgmentsGroupingTracker.java 61.95% <48.00%> (+3.05%) ⬆️
.../apache/pulsar/client/impl/BatchMessageIdImpl.java 67.50% <100.00%> (ø)
...a/org/apache/pulsar/client/impl/MessageIdImpl.java 80.43% <100.00%> (-0.42%) ⬇️
...lsar/client/impl/conf/ClientConfigurationData.java 96.66% <100.00%> (ø)
.../pulsar/broker/service/SharedConsumerAssignor.java 3.70% <0.00%> (-64.82%) ⬇️
... and 52 more

Comment on lines 108 to 123
public boolean ackIndividual() {
return acker.ackIndividual(batchIndex);
}

public boolean ackCumulative() {
return acker.ackCumulative(batchIndex);
}

public int getOutstandingAcksInSameBatch() {
return acker.getOutstandingAcks();
}

public int getBatchSize() {
return acker.getBatchSize();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the APIs exposed by the client, can they be deleted directly without any compatibility?

}
} else {
if (acker.ackCumulative(messageId.getBatchIndex())) {
batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't everything before this msgId be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to me. I will handle this and add a new test for it.

}
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to change the name of this method to AcknowledgeAndGet? Just like Atomic's IncreaseAndGet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This method is to get the MessageIdImpl to acknowledge. increaseAndGet modifies the atomic variable itself, but getMessageIdToAcknowledge just calculated a MessageIdImpl to acknowledge from the BatchMessageIdImpl and does not modify the BatchMessageIdImpl itself.

messageIds.forEach(messageId ->
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A public API is directly modified, and it is incompatible. If a user encapsulates this API and passes in other MessageId implementation classes, the old interface will be unavailable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AcknowledgmentsGroupingTracker is not marked with any InterfaceStability annotation like Stable and Evolving. The pulsar-client-original module contains classes as the "internal implementations" so that new classes and interfaces were added very casually. If we care much about the breaking changes of these "internal implementations", the code would be very hard to maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thought again, this PR should be able to cherry-pick into release branches, so I restored the API compatibility and mark those APIs that are never used as deprecated.

@BewareMyPower
Copy link
Contributor Author

Your comments are addressed now, PTAL @315157973

BTW, since I restored the API compatibility so I added the release/x.y.z labels.

@@ -204,6 +208,10 @@

private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
// Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also consider cleaning up the batchMessageToAcker

  • After the seek operation
  • After the message has been redelivered(Nack or Ack timeout)
  • After the consumer reconnects to the broker? (I'm not 100% sure about this part, the cursor-reset might happened on the broker side)

Copy link
Contributor Author

@BewareMyPower BewareMyPower Dec 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get it. A new map entry could only be only added when a message was acknowledged.

    private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
        final BatchMessageAcker acker;
        if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
            acker = batchMessageToAcker.computeIfAbsent(

Why should we clean up the batchMessageToAcker after a seek operation?

Regarding the message redelivering or reconnection, if the batch message id was not created by deserialization, the acker inside the BatchMessageIdImpl would not be modified. If we only clean up the batchMessageToAcker for deserialized batch message id, the behavior would be different.

For example, assuming there are 2 batch message ids (id0 and id1) of the same entry and for the following steps:

  1. Acknowledge id0
  2. Reconnection
  3. Acknowledge id1

If they are retrieved from the deserialization, the entry will not be acknowledged because the 2nd step cleared the cache, and the BatchMessageAcker will be "XO" (X represents not acknowledged).

However, if they are just saved from the message.getMessageId(), the entry will be acknowledged because the shared BatchMessageAcker is "OO" and not affected by the reconnection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batchMessageAckTracker before #1424 is updated for each message received:

    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf uncompressedPayload,
            MessageIdData messageId, ClientCnx cnx) {
        /* ... */
        batchMessageAckTracker.put(batchMessage, bitSet);

But the batchMessageToAcker in this PR will be updated only for acknowlegment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we clean up the batchMessageToAcker after a seek operation?

The seek API will lead the consumer to consume from a new start message ID. It can be an earlier position.
Suppose you have 3 batch messages.

0:(0,1,2,3),
1:(0,1,2),
2:(0,1,2,3,4,5)

The message 0, 1, and 2:(0,2,4) are acknowledged. Then the consumer seeks to the earliest position.
Then the consumer will receive messages 0, 1, and 2 again.
Due to we have 2:(0,2,4) in the batchMessageToAcker and If only 2:(1,3,5) is acked after the seek operation.
But the client will ack the whole message 2.

From a user perspective, it can be a data loss. We should guarantee the at-lease-once semantic after the seek operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the message redelivering or reconnection, if the batch message id was not created by deserialization, the acker inside the BatchMessageIdImpl would not be modified. If we only clean up the batchMessageToAcker for deserialized batch message id, the behavior would be different.

If the user decides to nack the message, they should not continue to ack it. After the message redelivery, the newly received message with a new Acker. But this PR introduced a shared state. It looks like the newly received message with the different acker can also associate with the old ack state if we don't clean up the shared state.

Copy link
Contributor

@codelipenghui codelipenghui Dec 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users should not continue to process/ack the messages before the seek operation. It's not the same case as I provided which is normal usage.

And we should ensure after the seek operation, we should not return the cached messages before the seek operation to users. If users try to cache the messages or just inflight messages, users need to guarantee they will not ack the inflight or cached messages before the seek operation.

BTW, if users left one message ID (messageIdList.get(4)) to acknowledge after seek and they didn't acknowledge other message IDs, what will they expect? Did they expect messages could be received again?

Yes, they should receive all the messages after the new seek position no matter what happened before the seek operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point I insisted on is, the following two code snippets should be equivalent.

        final List<BatchMessageIdImpl> msgIds = new ArrayList<>();
        for (int i = 0; i < numMessages; i++) {
            msgIds.add((BatchMessageIdImpl) consumer.receive().getMessageId());
        }
        final List<BatchMessageIdImpl> msgIds = new ArrayList<>();
        for (int i = 0; i < numMessages; i++) {
            final MessageIdImpl messageId = (MessageIdImpl) consumer.receive().getMessageId();
            MessageId deserialized = MessageId.fromByteArray(messageId.toByteArray());
            msgIds.add((BatchMessageIdImpl) deserialized);
        }

This PR works well for the assumption above.

Your solution to the corner cases you described is very hacky. You created the same (i.e. equals returns true) message ID and archive a different goal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users try to cache the messages or just inflight messages, users need to guarantee they will not ack the inflight or cached messages before the seek operation.

Just like this rule you've mentioned here, when message IDs from deserializations are used, there is a rule that all message IDs should be acknowledged (for simplicity, it does not count the cumulative ACK case). IMO, if users load a set of message IDs and he only acknowledges partial of them before seeking, it should be an incorrect use.

It's not the same case as I provided which is normal usage.

As I've explained above, it's not normal usage when you use message IDs from deserializations.

The wrong code users should not write:

var id0 = loadMessageId();
consumer.acknowledge(id0); // batch index: 0, batch size: 2
var id1 = loadMessageId(); // batch index: 1, batch size: 2
// User does not acknowledge id1 before seek
consumer.seek(MessageId.earliest);
// Instead, user acknowledges the outdated id1 after seek.
consumer.acknowledge(id1);

The correct code users should write:

var id0 = loadMessageId();
consumer.acknowledge(id0); // batch index: 0, batch size: 2
consumer.seek(MessageId.earliest);
storeMessageId(consumer.receive().getMessageId());  // batch index: 0, batch size: 2
storeMessageId(consumer.receive().getMessageId());  // batch index: 1, batch size: 2
var id1 = loadMessageId(); // batch index: 0, batch size: 2
var id2 = loadMessageId(); // batch index: 1, batch size: 2
consumer.acknowledge(id1);
consumer.acknowledge(id2);

Regarding the reconnection or ack failure, it's just the same. When acknowledgeAsync is called on all message IDs of a batch, the cache will be removed no matter if the acknowledgment succeeded. If only partial message IDs of a batch are acknowledged, when users received messages again, they should persist message IDs again and use the new message IDs, including the repeated positions, instead of the old message IDs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The id2 can be acked first, but the client crashed while processing message 1 (Shared subscription and users multiple threads to process messages). I mean that users do not want to miss the ack intentionally. But we can't guarantee the messages of a batch will be processed in order of a Shared subscription. Even if it is a Failover or Exclusive subscription, users can also ack individually and process messages out of order.

Regarding the reconnection or ack failure

I point out this one because a server-side cursor reset can cause the reconnection. But it should not be a problem that this PR should fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought again. If we have to use completely new BatchMessageIdImpl objects in regular cases, i.e. retrieve MessageId from Message, in the same case when BatchMessageIdImpl objects are retrieved from deserializations, the cache should be cleared.

BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this pull request Jan 17, 2023
### Motivation

Currently the main branch is broken by the concurrent merge of
apache#153 and
apache#151. It's because when
a batched message id is constructed from deserialization, there is no
`getBitSet` implementation of the internal acker.

### Modifications

Add a `bool` parameter to `MessageIdImpl::getBitSet` to indicate whether
the message ID is batched. The logic is similar with

https://github.com/apache/pulsar/blob/299bd70fdfa023768e94a8ee4347d39337b6cbd4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L325-L327

and

https://github.com/apache/pulsar/blob/299bd70fdfa023768e94a8ee4347d39337b6cbd4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L345-L347

Add a `testMessageIdFromBuild` to test the acknowledgment for a message
ID without an acker could succeed for a consumer that enables batch
index ACK.

### TODO

In future, apache/pulsar#19031 might be migrated
into the C++ client to fix the consumer that disables batch index ACK.
@github-actions
Copy link

github-actions bot commented Feb 3, 2023

The pr had no activity for 30 days, mark with Stale label.

@michaeljmarshall
Copy link
Member

As discussed on the mailing list https://lists.apache.org/thread/w4jzk27qhtosgsz7l9bmhf1t7o9mxjhp, there is no plan to release 2.9.6, so I am going to remove the release/2.9.6 label

@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Jul 29, 2023
@Technoboy- Technoboy- added this to the 3.2.0 milestone Jul 31, 2023
@Technoboy- Technoboy- modified the milestones: 3.2.0, 3.3.0 Dec 22, 2023
@coderzc coderzc modified the milestones: 3.3.0, 3.4.0 May 8, 2024
@lhotari lhotari modified the milestones: 4.0.0, 4.1.0 Oct 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Deserialized BatchMessageIdImpl cannot be used for acknowledgment