Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Client] Fix message being ignored when the non-persistent topic reader reconnect. #12348

Merged
merged 1 commit into from
Oct 14, 2021

Conversation

RobertIndie
Copy link
Member

@RobertIndie RobertIndie commented Oct 13, 2021

Motivation

Consumers should not perform message ignore checks when consuming messages from non-persistent topics.
Otherwise, it may lead to a case where the message is incorrectly ignored when the non-persistent reader(or non-durable subscription non-persistent consumer) reconnects to the broker.

Currently, when the reader of the non-persistent topic has its currentMessageQueue empty before reconnection, its startMessageId is set to lastDequeuedMessageId after reconnection:

return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId);

and if we specify startMessageId as the default value (-1:-1), it will be set to (0:0) after reconnection, thus causes all subsequent messages to be ignored:

if (isSameEntry(msgId) && isPriorEntryIndex(messageId.getEntryId())) {
// We need to discard entries that were prior to startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
consumerName, startMessageId);
}
uncompressedPayload.release();
return;
}

and for batch messages here:
if (isSameEntry(messageId) && isPriorBatchIndex(index)) {
// If we are receiving a batch message, we need to discard messages that were prior
// to the startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
consumerName, startMessageId);
}
return null;
}

Modifications

  • Remove the message ignore check when the topic of the consumer is non-persistent

Verifying this change

This change is already covered by existing tests, such as testReaderReconnectAndRead and testReaderReconnectAndReadBatchMessages.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • no-need-doc

This is a bug fix.

…ect.

Signed-off-by: Zike Yang <ar@armail.top>
@Anonymitaet Anonymitaet added the doc-not-needed Your PR changes do not impact docs label Oct 14, 2021
@codelipenghui codelipenghui merged commit 867d71c into apache:master Oct 14, 2021
Copy link
Contributor

@hangc0276 hangc0276 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice Catch!

zeo1995 pushed a commit to zeo1995/pulsar that referenced this pull request Oct 14, 2021
* up/master: (37 commits)
  re-enabling integration tests for Sinks (apache#12307)
  [PIP 95][Issue 12040][web] Topic lookup with listener header (apache#12072)
  Fix the master CI broken with update dispatch rate block issue (apache#12360)
  Fix message being ignored when the non-persistent topic reader reconnect. (apache#12348)
  Fix log format. (apache#12346)
  [website][upgrade]feat: docs migration - version-2.7.2 Concepts and Architecture (apache#12354)
  [website][upgrade] feat: full docs migration for version 2.8.0 (apache#12359)
  [website][upgrade]feat: dynamic replace version info before build (apache#12337)
  Fix flaky tests: ElasticSearchClientTests (apache#12347)
  Use asyncCloseCursorLedger to replace cursorLedger.asyncClose method in the ManagedCursorImpl.VoidCallback#operationComplete (apache#12113)
  fix-npe-ZkBookieRackAffinityMapping (apache#11947)
  [pulsar-admin] Allow setting --forward-source-message-property to false when updating a pulsar function (apache#12128)
  [website][upgrade]feat: docs migration - Development (apache#12320)
  Update delete inactive topic configuration documentation (apache#12350)
  [PIP 95][Issue 12040][broker] Multiple bind addresses for Pulsar protocol (apache#12056)
  Added Debezium Source for MS SQL Server (apache#12256)
  Fix: flaky oracle tests (apache#12306)
  [C++] Use URL encoded content type for OAuth 2.0 authentication (apache#12341)
  [C++] Handle OAuth 2.0 exceptional cases gracefully (apache#12335)
  feat(cli): add restart command to pulsar-daemon (apache#12279)
  ...

# Conflicts:
#	site2/website-next/docusaurus.config.js
#	site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json
#	site2/website-next/versions.json
codelipenghui pushed a commit that referenced this pull request Oct 15, 2021
…ect. (#12348)

Consumers should not perform message ignore checks when consuming messages from non-persistent topics.
Otherwise, it may lead to a case where the message is incorrectly ignored when the non-persistent reader(or non-durable subscription non-persistent consumer) reconnects to the broker.

Currently, when the reader of the non-persistent topic has its `currentMessageQueue` empty before reconnection, its `startMessageId` is set to `lastDequeuedMessageId` after reconnection: https://github.com/apache/pulsar/blob/4ae7f6a1b38a003c9fc26844e52771b776bf64bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L856

 and if we specify `startMessageId` as the default value (-1:-1), it will be set to (0:0) after reconnection, thus causes all subsequent messages to be ignored:
https://github.com/apache/pulsar/blob/4ae7f6a1b38a003c9fc26844e52771b776bf64bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1202-L1211
and for batch messages here:
https://github.com/apache/pulsar/blob/4ae7f6a1b38a003c9fc26844e52771b776bf64bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1029-L1037

* Remove the message ignore check when the topic of the consumer is non-persistent

(cherry picked from commit 867d71c)
@hangc0276 hangc0276 added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Oct 17, 2021
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Oct 27, 2021
…ect. (apache#12348)

Consumers should not perform message ignore checks when consuming messages from non-persistent topics.
Otherwise, it may lead to a case where the message is incorrectly ignored when the non-persistent reader(or non-durable subscription non-persistent consumer) reconnects to the broker.

Currently, when the reader of the non-persistent topic has its `currentMessageQueue` empty before reconnection, its `startMessageId` is set to `lastDequeuedMessageId` after reconnection: https://github.com/apache/pulsar/blob/4ae7f6a1b38a003c9fc26844e52771b776bf64bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L856

 and if we specify `startMessageId` as the default value (-1:-1), it will be set to (0:0) after reconnection, thus causes all subsequent messages to be ignored:
https://github.com/apache/pulsar/blob/4ae7f6a1b38a003c9fc26844e52771b776bf64bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1202-L1211
and for batch messages here:
https://github.com/apache/pulsar/blob/4ae7f6a1b38a003c9fc26844e52771b776bf64bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1029-L1037

* Remove the message ignore check when the topic of the consumer is non-persistent

(cherry picked from commit 867d71c)
(cherry picked from commit 8e2b003)
@codelipenghui codelipenghui modified the milestones: 2.10.0, 2.9.0 Dec 20, 2021
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…ect. (apache#12348)

### Motivation

Consumers should not perform message ignore checks when consuming messages from non-persistent topics.
Otherwise, it may lead to a case where the message is incorrectly ignored when the non-persistent reader(or non-durable subscription non-persistent consumer) reconnects to the broker.

Currently, when the reader of the non-persistent topic has its `currentMessageQueue` empty before reconnection, its `startMessageId` is set to `lastDequeuedMessageId` after reconnection: https://github.com/apache/pulsar/blob/4ae7f6a1b38a003c9fc26844e52771b776bf64bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L856

 and if we specify `startMessageId` as the default value (-1:-1), it will be set to (0:0) after reconnection, thus causes all subsequent messages to be ignored:
https://github.com/apache/pulsar/blob/4ae7f6a1b38a003c9fc26844e52771b776bf64bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1202-L1211
and for batch messages here:
https://github.com/apache/pulsar/blob/4ae7f6a1b38a003c9fc26844e52771b776bf64bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1029-L1037

### Modifications

* Remove the message ignore check when the topic of the consumer is non-persistent
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.8 Archived: 2.8 is end of life doc-not-needed Your PR changes do not impact docs release/2.8.2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants