-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36180] Fix batch message data loss #95
base: main
Are you sure you want to change the base?
Conversation
messageIdImpl.getPartitionIndex(), | ||
messageIdImpl.getBatchIndex() + 1, | ||
messageIdImpl.getBatchSize(), | ||
messageIdImpl.getAckSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change the act set here for making sure the current batch index has been acknowledged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch Message is a quite complex feature. How could we verify the acknowledge is acked cumulately in batch message? It could be acked individually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change the act set here for making sure the current batch index has been acknowledged?
There is no need to modify this AckSet, because we finally call Consumer seek. In the seek method, the messages before batchIndex will be cumulative acked. The AckSet here will not work regardless of its status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch Message is a quite complex feature. How could we verify the acknowledge is acked cumulately in batch message? It could be acked individually.
-
In flink-pulsar-connector, the receive queue of the consumer is set to 1, will the message before the current batchIndex not be confirmed?
-
Even if a single message is confirmed, the current connector does not support BatchIndexAck.
-
The seek operation during task failure recovery cannot guarantee that AckSet will work, as I said above.
-
The current connector's ack behavior is cumulative confirmation
Finally, if we don't change the seeking behavior of CursorPosition, we won't be able to recover from AckSet regardless of the AckSet in the state.
The changes in this PR are valid under the current cumulative acknowledgment behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The receive queue setting
Line 282 in b37a8b3
public static final ConfigOption<Integer> PULSAR_RECEIVER_QUEUE_SIZE = 1000
- IIUC, the support for the batch
AckSet
is achieved locally by the pulsar-client after all the batch message has been acked. (BTW, this shouldn't be touched by the connector user and developer, which should be promised by the pulsar client developer.) - The recover is queried from the checkpoint saved
MessageId
. Which the AckSet is controlled internally by the client I think.
@@ -196,7 +197,14 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges | |||
MessageId latestConsumedId = registeredSplit.getLatestConsumedId(); | |||
|
|||
if (latestConsumedId != null) { | |||
LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId); | |||
if (latestConsumedId instanceof BatchMessageIdImpl) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to use MessageIdAdv
globally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageIdAdv is inaccurate. It contains implementations such as MessageIdImpl, not BatchMessageId. Here we want to print out the correct batchSize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see that all the message implementation implement the MessageIdAdv
interface. Which contains all the required information for the client. I think it's more better to use MessageIdAdv instead of the MessageId
here in the whole connector.
/**
* The {@link MessageId} interface provided for advanced users.
* <p>
* All built-in MessageId implementations should be able to be cast to MessageIdAdv.
* </p>
*/
Thanks for your pull request @wenbingshen. I left some questions here. Can you check them? |
@syhily I responded above, PTAL. Thanks. |
After discussing with @wenbingshen I think this PR do fixed the batch message consuming issues when the connector is recovered from the checkpoint. LGTM. @reswqa @tisonkun Can you double confirm this PR and merge it? |
Purpose of the change
When restoring from the state, the unconsumed messages of the batch will be lost.
we should return the next batch index single message for current batch.