Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the flush callback might be called repeatedly #353

Merged

Conversation

BewareMyPower
Copy link
Contributor

Fixes #352

Motivation

#303 adds the flush callback to the last OpSendMsg instead of adding to the batch message container. However, batchMessageAndSend will create an OpSendMsg and add it to the pendingMessagesQueue_.

auto failures = batchMessageAndSend(callback);
if (!pendingMessagesQueue_.empty()) {
auto& opSendMsg = pendingMessagesQueue_.back();
lock.unlock();
failures.complete();
opSendMsg->addTrackerCallback(callback);

In the code above, pendingMessagesQueue_ could never be empty and the callback will be added again by opSendMsg->addTrackerCallback. The 1st time it's added in createOpSendMsg or createOpSendMsgs called by batchMessageAndSend.

Motivation

Add the callback to the last `OpSendMsg only when the batch message container is empty.

In testFlushBatch, replace the flush call with the flushAsync call and verify the callback is only called once after it's completed.

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

Fixes apache#352

### Motivation

apache#303 adds the flush
callback to the last `OpSendMsg` instead of adding to the batch message
container. However, `batchMessageAndSend` will create an `OpSendMsg` and
add it to the `pendingMessagesQueue_`.

https://github.com/apache/pulsar-client-cpp/blob/7bb94f45b917ed30b5302ac93ffa1f1942fc6313/lib/ProducerImpl.cc#L384-L389

In the code above, `pendingMessagesQueue_` could never be empty and the
callback will be added again by `opSendMsg->addTrackerCallback`. The 1st
time it's added in `createOpSendMsg` or `createOpSendMsgs` called by
`batchMessageAndSend`.

### Motivation

Add the callback to the last `OpSendMsg only when the batch message
container is empty.

In `testFlushBatch`, replace the `flush` call with the `flushAsync` call
and verify the callback is only called once after it's completed.
@BewareMyPower BewareMyPower added the bug Something isn't working label Nov 20, 2023
@BewareMyPower BewareMyPower added this to the 3.4.1 milestone Nov 20, 2023
@BewareMyPower BewareMyPower self-assigned this Nov 20, 2023
@BewareMyPower BewareMyPower merged commit 37ea769 into apache:main Nov 21, 2023
11 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-flush-twice branch November 21, 2023 02:42
BewareMyPower added a commit that referenced this pull request Nov 21, 2023
Fixes #352

### Motivation

#303 adds the flush
callback to the last `OpSendMsg` instead of adding to the batch message
container. However, `batchMessageAndSend` will create an `OpSendMsg` and
add it to the `pendingMessagesQueue_`.

https://github.com/apache/pulsar-client-cpp/blob/7bb94f45b917ed30b5302ac93ffa1f1942fc6313/lib/ProducerImpl.cc#L384-L389

In the code above, `pendingMessagesQueue_` could never be empty and the
callback will be added again by `opSendMsg->addTrackerCallback`. The 1st
time it's added in `createOpSendMsg` or `createOpSendMsgs` called by
`batchMessageAndSend`.

### Motivation

Add the callback to the last `OpSendMsg only when the batch message
container is empty.

In `testFlushBatch`, replace the `flush` call with the `flushAsync` call
and verify the callback is only called once after it's completed.

(cherry picked from commit 37ea769)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] pulsar_producer_flush_async will received two callbacks
2 participants