Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-150][improve][broker] Support read the message of startMessageId position on the broker side #15568

Closed
wants to merge 1 commit into from

Conversation

nodece
Copy link
Member

@nodece nodece commented May 12, 2022

Fixes #14883

Motivation

See #14883

Modifications

  • In broker side, add a start_message_id_inclusive field to CommandSubscribe message
  • Client supports set the start_message_id_inclusive value of CommandSubscribe message
  • Improve NonDurableCursorImpl.java and ManagedCursorImpl.java supports reading the message of startMessageId position

Verifying this change

  • Make sure that the change passes the CI checks.
  • Added tests

Documentation

Need to update docs?

  • no-need-doc

  • doc-not-needed

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label May 12, 2022
@nodece nodece force-pushed the PIP-150 branch 2 times, most recently from 216b7a7 to dacd590 Compare May 12, 2022 16:48
@nodece
Copy link
Member Author

nodece commented May 13, 2022

/pulsarbot rerun-failure-checks

3 similar comments
@nodece
Copy link
Member Author

nodece commented May 13, 2022

/pulsarbot rerun-failure-checks

@nodece
Copy link
Member Author

nodece commented May 19, 2022

/pulsarbot rerun-failure-checks

@nodece
Copy link
Member Author

nodece commented May 21, 2022

/pulsarbot rerun-failure-checks

@nodece
Copy link
Member Author

nodece commented May 27, 2022

Blocked by #15761

@nodece
Copy link
Member Author

nodece commented May 31, 2022

/pulsarbot rerun-failure-checks

Copy link
Member

@shibd shibd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.

@@ -48,39 +46,46 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
// Start from last entry
switch (initialPosition) {
case Latest:
initializeCursorPosition(ledger.getLastPositionAndCounter());
initializeCursorPosition(ledger.getLastPositionAndCounter(), inclusive);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the PIP. Just when startMessageId == latest, need judge inclusive. We can keep the initializeCursorPosition method unchanged and add inclusive param to the getLastPositionAndCounter method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initializeCursorPosition will read the next valid position from this position, so I think to handle this is better in initializeCursorPosition.

@nodece
Copy link
Member Author

nodece commented Jun 15, 2022

/pulsarbot rerun-failure-checks

@nodece
Copy link
Member Author

nodece commented Jun 15, 2022

Then blocked by org.apache.pulsar.broker.service.SubscriptionSeekTest#testSeekByFunctionAndMultiTopic, repeated messages are often received.

@@ -242,11 +242,13 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition) throws In
* operation will trigger the creation of the cursor.
* @param cursorProperties
* the properties for the Cursor
* @param inclusive
* whether to read from the specified position
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add default override methods for compatibility? Because they are common APIs that can be used by other components like protocol handler.

e.g.

    default void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
                         Map<String, String> cursorProperties, OpenCursorCallback callback, Object ctx) {
        asyncOpenCursor(name, initialPosition, properties, cursorProperties, false, callback, ctx);
    }

@nodece
Copy link
Member Author

nodece commented Jul 7, 2022

This PR has been blocked by #16171.

… position on the broker side

Signed-off-by: Zixuan Liu <nodeces@gmail.com>
@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Aug 15, 2022
@nodece
Copy link
Member Author

nodece commented Mar 25, 2023

No reviewer, so close this PR.

@nodece nodece closed this Mar 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs Stale type/PIP
Projects
None yet
Development

Successfully merging this pull request may close these issues.

PIP-150: Support read the message of startMessageId position on the broker side
3 participants