-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-client] add api to wait for inflight messages while closing producer #6648
Conversation
@rdhabalia The CI failed related to the checkstyle, please take a look. |
@rdhabalia The waiting for all the inflight messages should be the default current behavior (that's also what the javadoc says :) ). If that's not the case, then we should just fix the implementation but I don't think we need 2 options there. |
@merlimat |
return closeAsync(false); | ||
} | ||
|
||
public CompletableFuture<Void> closeAsync(boolean waitForPendingMessages) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method can now be deleted
|
||
private void closeAsync(boolean waitForPendingMessage, CompletableFuture<Void> closeFuture) { | ||
if (waitForPendingMessage && !pendingMessages.isEmpty()) { | ||
CLOSING_IN_PROGRESS_UPDATER.set(this, TRUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're not using any compareAndSet()
over the flag, then we won't need to use the updater.
return closeResult; | ||
} | ||
|
||
private void closeAsync(boolean waitForPendingMessage, CompletableFuture<Void> closeFuture) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need the 2 new methods if we only care about the case waitForPendingMessage=true
.
/pulsarbot run-failure-checks |
@rdhabalia Could you please rebase the master branch? |
ping @rdhabalia please rebase the branch. |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
3 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
@rdhabalia Looks the failed CI tests related to this PR, could you please take a look? Or can we move it to 2.7.0 first? |
move to 2.7.0 first. |
…roducer default closeAsync() should wait for inflight messages address comments
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
@rdhabalia Could you please also rebase this PR to apache/master? |
/pulsarbot run-failure-checks |
@rdhabalia Could you please help take a look at the failed test? looks related to this change. |
Hi, @rdhabalia. It looks like this PR is opened for a long time, do we still need this? If we don't need this I will close this PR and remove the |
Because this PR is opened for a long time. I will close this PR. Feel free to reopen it if you needed. |
Motivation
Right now, when user closes the producer, pulsar-client immediately fails inflight messages even if they persist successfully at the broker also most of the times users don't want to fail those messages rather user wants to wait for those inflight messages. Right now, pulsar-client lib doesn't provide a way to wait for inflight messages before closing the producer.
Modification
Add close API with the flag where user can control to wait for inflight messages.
Result
With this change user can close producer by waiting for inflight messages and pulsar-client will not fail those messages immediately.