-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 2912][pulsar-admin] add get-message-by-id cmd into pulsar-admin #6331
[Issue 2912][pulsar-admin] add get-message-by-id cmd into pulsar-admin #6331
Conversation
Test result on my local laptop:
|
@nlu90 I noticed that to get the message content by ID needs to specify a subscription name. Sorry I didn't clarify this matter in the issue. Is it better to not require the user to specify the message ID? No matter which subscription name is specified, the returned message content is the same. It's different with peek messages, users can peek one or more messages by peeking messages. So we don’t need to consider whether the message is acknowledged or not. Future more, the entry may represent a batch message. If I understand correctly, if it is a batch message, the format of the information received by the user may not be easy to read. And the message can encoded by different schema, we need to leverage by pulsar schema. This part can also be solved by a separate PR. At present, the peek message is not handled. |
For your first suggestion about removing subscription, I actually considered it when I was first implementing this functionality. But later I kept it there due to implementation simplicity. Now I removed it and will create a one-time subscription when fetching the message internally. Please take a look and let me know if you have any other suggestions. For your second suggestion -- handling batch message, I thins we can handle it it in a separate issue and PR. |
Now the command works as following:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a comment related to using ManagedLedger to get a message directly. And please also update the documentation for the new admin method.
* @param callback | ||
* @param ctx | ||
*/ | ||
void asyncGetMessageById(long ledgerId, long entryId, IndividualDeletedEntries deletedEntries, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to what we discussed before, We don't need to specify a subscription name for getting a message by ID. So I think we can get a message from ManagedLedger directly by https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1494. So that we don't need to create a one-time subscription.
Entry entry = null; | ||
try { | ||
PersistentSubscription subscription = | ||
(PersistentSubscription) topic.createSubscription(subName, InitialPosition.Earliest, false).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as comments above
@@ -79,6 +80,8 @@ default long getNumberOfEntriesDelayed() { | |||
|
|||
CompletableFuture<Entry> peekNthMessage(int messagePosition); | |||
|
|||
CompletableFuture<Entry> getMessageById(long ledgerId, long entryId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can use ManagedLedger directly, this method also can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. Fixed them in the following update.
@nlu90 Thanks for achieving it. The change looks good to me. |
@codelipenghui Thanks for all the suggestions. I removed the @sijie I think this feature should not be labelled with |
@nlu90 my bad. fixed it. |
} | ||
}, null); | ||
|
||
entry = future.get(1000, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for being late in reviewing this pull request. I would suggest implementing this using AsyncResponse. We have tried to move away from using sync methods.
You can check internalCreateSubscription
on how to use AsyncResponse.
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Line 1305 in df15210
protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName, |
@sijie @nlu90 I added |
@sijie updated with the asyncResponse to construct the result @codelipenghui added the doc into the following two md file: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nlu90 overall looks good. one small comment regarding releasing the entry.
|
||
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); | ||
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); | ||
Entry entry = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is needed any more.
clientAppId(), ledgerId, entryId, topicName, exception); | ||
asyncResponse.resume(new RestException(exception)); | ||
} finally { | ||
if (entry != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this finally
block to line 1520
…mnative/pulsar into neng/add-get-message-by-id
It seems the code can not be compiled.
|
@Override | ||
public void readEntryComplete(Entry entry, Object ctx) { | ||
try { | ||
asyncResponse.resume(generateResponseWithEntry(entry)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to release the Entry
it after using it.
@sijie The problem is caused by some conflicting commit. I've fixed them now. Also added the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nlu90 awesome contribution!
/pulsarbot run-failure-checks |
apache#6331) Fixes apache#2912 ### Motivation Adding a new command `get-message-by-id` to the pulsar-admin which allows user to look at a single message by providing ledger id and entry id. ### Modifications - pulsar-admin includes the new command `get-message-by-id` - pulsar-broker v1/v2 apis to handle the get-message-by-id request from pulsar-admin - managedCursor to read from ledgers with given ledger id and entry id
Fixes #2912
Motivation
Adding a new command
get-message-by-id
to the pulsar-admin which allows user to look at a single message by providing ledger id and entry id.Modifications
get-message-by-id
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation